PostgreSQL Source Code git master
Loading...
Searching...
No Matches
archive_waldump.c File Reference
#include "postgres_fe.h"
#include <unistd.h>
#include "access/xlog_internal.h"
#include "common/file_perm.h"
#include "common/hashfn.h"
#include "common/logging.h"
#include "fe_utils/simple_list.h"
#include "pg_waldump.h"
#include "lib/simplehash.h"
Include dependency graph for archive_waldump.c:

Go to the source code of this file.

Data Structures

struct  ArchivedWALFile
 
struct  astreamer_waldump
 

Macros

#define READ_CHUNK_SIZE   (128 * 1024)
 
#define READ_ANY_WAL(privateInfo)   ((privateInfo)->start_segno == 0)
 
#define SH_PREFIX   ArchivedWAL
 
#define SH_ELEMENT_TYPE   ArchivedWALFile
 
#define SH_KEY_TYPE   const char *
 
#define SH_KEY   fname
 
#define SH_HASH_KEY(tb, key)   hash_string_pointer(key)
 
#define SH_EQUAL(tb, a, b)   (strcmp(a, b) == 0)
 
#define SH_SCOPE   static inline
 
#define SH_RAW_ALLOCATOR   pg_malloc0
 
#define SH_DECLARE
 
#define SH_DEFINE
 

Typedefs

typedef struct ArchivedWALFile ArchivedWALFile
 
typedef struct astreamer_waldump astreamer_waldump
 

Functions

static uint32 hash_string_pointer (const char *s)
 
static ArchivedWALFileget_archive_wal_entry (const char *fname, XLogDumpPrivate *privateInfo)
 
static int read_archive_file (XLogDumpPrivate *privateInfo, Size count)
 
static void setup_tmpwal_dir (const char *waldir)
 
static void cleanup_tmpwal_dir_atexit (void)
 
static FILEprepare_tmp_write (const char *fname, XLogDumpPrivate *privateInfo)
 
static void perform_tmp_write (const char *fname, StringInfo buf, FILE *file)
 
static astreamerastreamer_waldump_new (XLogDumpPrivate *privateInfo)
 
static void astreamer_waldump_content (astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
 
static void astreamer_waldump_finalize (astreamer *streamer)
 
static void astreamer_waldump_free (astreamer *streamer)
 
static bool member_is_wal_file (astreamer_waldump *mystreamer, astreamer_member *member, char **fname)
 
void init_archive_reader (XLogDumpPrivate *privateInfo, pg_compress_algorithm compression)
 
void free_archive_reader (XLogDumpPrivate *privateInfo)
 
int read_archive_wal_page (XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, Size count, char *readBuff)
 
void free_archive_wal_entry (const char *fname, XLogDumpPrivate *privateInfo)
 

Variables

charTmpWalSegDir = NULL
 
static const astreamer_ops astreamer_waldump_ops
 

Macro Definition Documentation

◆ READ_ANY_WAL

#define READ_ANY_WAL (   privateInfo)    ((privateInfo)->start_segno == 0)

Definition at line 38 of file archive_waldump.c.

◆ READ_CHUNK_SIZE

#define READ_CHUNK_SIZE   (128 * 1024)

Definition at line 29 of file archive_waldump.c.

◆ SH_DECLARE

#define SH_DECLARE

Definition at line 80 of file archive_waldump.c.

◆ SH_DEFINE

#define SH_DEFINE

Definition at line 81 of file archive_waldump.c.

◆ SH_ELEMENT_TYPE

#define SH_ELEMENT_TYPE   ArchivedWALFile

Definition at line 73 of file archive_waldump.c.

◆ SH_EQUAL

#define SH_EQUAL (   tb,
  a,
  b 
)    (strcmp(a, b) == 0)

Definition at line 77 of file archive_waldump.c.

◆ SH_HASH_KEY

#define SH_HASH_KEY (   tb,
  key 
)    hash_string_pointer(key)

Definition at line 76 of file archive_waldump.c.

◆ SH_KEY

#define SH_KEY   fname

Definition at line 75 of file archive_waldump.c.

◆ SH_KEY_TYPE

#define SH_KEY_TYPE   const char *

Definition at line 74 of file archive_waldump.c.

◆ SH_PREFIX

#define SH_PREFIX   ArchivedWAL

Definition at line 72 of file archive_waldump.c.

◆ SH_RAW_ALLOCATOR

#define SH_RAW_ALLOCATOR   pg_malloc0

Definition at line 79 of file archive_waldump.c.

◆ SH_SCOPE

#define SH_SCOPE   static inline

Definition at line 78 of file archive_waldump.c.

Typedef Documentation

◆ ArchivedWALFile

◆ astreamer_waldump

Function Documentation

◆ astreamer_waldump_content()

static void astreamer_waldump_content ( astreamer streamer,
astreamer_member member,
const char data,
int  len,
astreamer_archive_context  context 
)
static

Definition at line 693 of file archive_waldump.c.

696{
698 XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
699
700 Assert(context != ASTREAMER_UNKNOWN);
701
702 switch (context)
703 {
705 {
706 char *fname = NULL;
707 ArchivedWALFile *entry;
708 bool found;
709
710 pg_log_debug("reading \"%s\"", member->pathname);
711
712 if (!member_is_wal_file(mystreamer, member, &fname))
713 break;
714
715 /*
716 * Skip range filtering during initial startup, before the WAL
717 * segment size and segment number bounds are known.
718 */
719 if (!READ_ANY_WAL(privateInfo))
720 {
721 XLogSegNo segno;
722 TimeLineID timeline;
723
724 /*
725 * Skip the segment if the timeline does not match, if it
726 * falls outside the caller-specified range.
727 */
728 XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
729 if (privateInfo->timeline != timeline ||
730 privateInfo->start_segno > segno ||
731 privateInfo->end_segno < segno)
732 {
733 pfree(fname);
734 break;
735 }
736 }
737
738 entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
739 fname, &found);
740
741 /*
742 * Shouldn't happen, but if it does, simply ignore the
743 * duplicate WAL file.
744 */
745 if (found)
746 {
747 pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
748 member->pathname, privateInfo->archive_name);
749 pfree(fname);
750 break;
751 }
752
753 entry->buf = makeStringInfo();
754 entry->spilled = false;
755 entry->read_len = 0;
756 privateInfo->cur_file = entry;
757 }
758 break;
759
761 if (privateInfo->cur_file)
762 {
763 appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
764 privateInfo->cur_file->read_len += len;
765 }
766 break;
767
769
770 /*
771 * End of this tar member; mark cur_file NULL so subsequent
772 * content callbacks (if any) know no WAL file is currently
773 * active.
774 */
775 privateInfo->cur_file = NULL;
776 break;
777
779 break;
780
781 default:
782 /* Shouldn't happen. */
783 pg_fatal("unexpected state while parsing tar file");
784 }
785}
#define READ_ANY_WAL(privateInfo)
static bool member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member, char **fname)
@ ASTREAMER_MEMBER_HEADER
Definition astreamer.h:65
@ ASTREAMER_MEMBER_CONTENTS
Definition astreamer.h:66
@ ASTREAMER_MEMBER_TRAILER
Definition astreamer.h:67
@ ASTREAMER_ARCHIVE_TRAILER
Definition astreamer.h:68
@ ASTREAMER_UNKNOWN
Definition astreamer.h:64
#define Assert(condition)
Definition c.h:945
#define pg_log_debug(...)
Definition logging.h:133
void pfree(void *pointer)
Definition mcxt.c:1616
#define pg_fatal(...)
const void size_t len
const void * data
#define pg_log_warning(...)
Definition pgfnames.c:24
static int fb(int x)
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
XLogSegNo start_segno
Definition pg_waldump.h:62
struct ArchivedWAL_hash * archive_wal_htab
Definition pg_waldump.h:54
char * archive_name
Definition pg_waldump.h:36
TimeLineID timeline
Definition pg_waldump.h:27
XLogSegNo end_segno
Definition pg_waldump.h:63
struct ArchivedWALFile * cur_file
Definition pg_waldump.h:47
char pathname[MAXPGPATH]
Definition astreamer.h:81
static void XLogFromFileName(const char *fname, TimeLineID *tli, XLogSegNo *logSegNo, int wal_segsz_bytes)
uint32 TimeLineID
Definition xlogdefs.h:63
uint64 XLogSegNo
Definition xlogdefs.h:52

References appendBinaryStringInfo(), XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_wal_htab, Assert, ASTREAMER_ARCHIVE_TRAILER, ASTREAMER_MEMBER_CONTENTS, ASTREAMER_MEMBER_HEADER, ASTREAMER_MEMBER_TRAILER, ASTREAMER_UNKNOWN, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, data, XLogDumpPrivate::end_segno, fb(), len, makeStringInfo(), member_is_wal_file(), astreamer_member::pathname, pfree(), pg_fatal, pg_log_debug, pg_log_warning, READ_ANY_WAL, ArchivedWALFile::read_len, XLogDumpPrivate::segsize, ArchivedWALFile::spilled, XLogDumpPrivate::start_segno, XLogDumpPrivate::timeline, and XLogFromFileName().

◆ astreamer_waldump_finalize()

static void astreamer_waldump_finalize ( astreamer streamer)
static

Definition at line 792 of file archive_waldump.c.

793{
794 Assert(streamer->bbs_next == NULL);
795}
astreamer * bbs_next
Definition astreamer.h:110

References Assert, astreamer::bbs_next, and fb().

◆ astreamer_waldump_free()

static void astreamer_waldump_free ( astreamer streamer)
static

Definition at line 801 of file archive_waldump.c.

802{
803 Assert(streamer->bbs_next == NULL);
804 pfree(streamer);
805}

References Assert, astreamer::bbs_next, fb(), and pfree().

◆ astreamer_waldump_new()

static astreamer * astreamer_waldump_new ( XLogDumpPrivate privateInfo)
static

Definition at line 673 of file archive_waldump.c.

674{
675 astreamer_waldump *streamer;
676
678 *((const astreamer_ops **) &streamer->base.bbs_ops) =
680
681 streamer->privateInfo = privateInfo;
682
683 return &streamer->base;
684}
static const astreamer_ops astreamer_waldump_ops
#define palloc0_object(type)
Definition fe_memutils.h:75
XLogDumpPrivate * privateInfo
const astreamer_ops * bbs_ops
Definition astreamer.h:109

References astreamer_waldump_ops, astreamer_waldump::base, astreamer::bbs_ops, palloc0_object, and astreamer_waldump::privateInfo.

Referenced by init_archive_reader().

◆ cleanup_tmpwal_dir_atexit()

static void cleanup_tmpwal_dir_atexit ( void  )
static

Definition at line 601 of file archive_waldump.c.

602{
604
605 rmtree(TmpWalSegDir, true);
606
608}
char * TmpWalSegDir
bool rmtree(const char *path, bool rmtopdir)
Definition rmtree.c:50

References Assert, fb(), rmtree(), and TmpWalSegDir.

Referenced by prepare_tmp_write().

◆ free_archive_reader()

void free_archive_reader ( XLogDumpPrivate privateInfo)

Definition at line 234 of file archive_waldump.c.

235{
236 /*
237 * NB: Normally, astreamer_finalize() is called before astreamer_free() to
238 * flush any remaining buffered data or to ensure the end of the tar
239 * archive is reached. However, when decoding WAL, once we hit the end
240 * LSN, any remaining buffered data or unread portion of the archive can
241 * be safely ignored.
242 */
243 astreamer_free(privateInfo->archive_streamer);
244
245 /* Free any remaining hash table entries and their buffers. */
246 if (privateInfo->archive_wal_htab != NULL)
247 {
249 ArchivedWALFile *entry;
250
251 ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
252 while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
253 &iter)) != NULL)
254 {
255 if (entry->buf != NULL)
256 destroyStringInfo(entry->buf);
257 }
259 privateInfo->archive_wal_htab = NULL;
260 }
261
262 /* Free the reusable read buffer. */
263 if (privateInfo->archive_read_buf != NULL)
264 {
265 pg_free(privateInfo->archive_read_buf);
266 privateInfo->archive_read_buf = NULL;
267 }
268
269 /* Close the file. */
270 if (close(privateInfo->archive_fd) != 0)
271 pg_log_error("could not close file \"%s\": %m",
272 privateInfo->archive_name);
273}
static void astreamer_free(astreamer *streamer)
Definition astreamer.h:153
void pg_free(void *ptr)
#define close(a)
Definition win32.h:12
#define pg_log_error(...)
Definition logging.h:106
void destroyStringInfo(StringInfo str)
Definition stringinfo.c:409
astreamer * archive_streamer
Definition pg_waldump.h:39
char * archive_read_buf
Definition pg_waldump.h:40

References XLogDumpPrivate::archive_fd, XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_read_buf, XLogDumpPrivate::archive_streamer, XLogDumpPrivate::archive_wal_htab, astreamer_free(), ArchivedWALFile::buf, close, destroyStringInfo(), fb(), pg_free(), and pg_log_error.

Referenced by main().

◆ free_archive_wal_entry()

void free_archive_wal_entry ( const char fname,
XLogDumpPrivate privateInfo 
)

Definition at line 415 of file archive_waldump.c.

416{
417 ArchivedWALFile *entry;
418
419 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
420
421 if (entry == NULL)
422 return;
423
424 /* Destroy the buffer */
425 destroyStringInfo(entry->buf);
426 entry->buf = NULL;
427
428 /* Remove temporary file if any */
429 if (entry->spilled)
430 {
431 char fpath[MAXPGPATH];
432
433 snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
434
435 if (unlink(fpath) == 0)
436 pg_log_debug("removed file \"%s\"", fpath);
437 }
438
439 /* Clear cur_file if it points to the entry being freed */
440 if (privateInfo->cur_file == entry)
441 privateInfo->cur_file = NULL;
442
443 ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
444}
#define MAXPGPATH
#define snprintf
Definition port.h:260

References XLogDumpPrivate::archive_wal_htab, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, destroyStringInfo(), fb(), MAXPGPATH, pg_log_debug, snprintf, ArchivedWALFile::spilled, and TmpWalSegDir.

Referenced by init_archive_reader(), and TarWALDumpReadPage().

◆ get_archive_wal_entry()

static ArchivedWALFile * get_archive_wal_entry ( const char fname,
XLogDumpPrivate privateInfo 
)
static

Definition at line 454 of file archive_waldump.c.

455{
456 ArchivedWALFile *entry = NULL;
457 FILE *write_fp = NULL;
458
459 /*
460 * Search the hash table first. If the entry is found, return it.
461 * Otherwise, the requested WAL entry hasn't been read from the archive
462 * yet; invoke the archive streamer to fetch it.
463 */
464 while (1)
465 {
466 /*
467 * Search hash table.
468 *
469 * We perform the search inside the loop because a single iteration of
470 * the archive reader may decompress and extract multiple files into
471 * the hash table. One of these newly added files could be the one we
472 * are seeking.
473 */
474 entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
475
476 if (entry != NULL)
477 return entry;
478
479 /*
480 * Capture the current entry before calling read_archive_file(),
481 * because cur_file may advance to a new segment during streaming. We
482 * hold this reference so we can flush any remaining buffer data and
483 * close the write handle once we detect that cur_file has moved on.
484 */
485 entry = privateInfo->cur_file;
486
487 /*
488 * Fetch more data either when no current file is being tracked or
489 * when its buffer has been fully flushed to the temporary file.
490 */
491 if (entry == NULL || entry->buf->len == 0)
492 {
493 if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
494 break; /* archive file ended */
495 }
496
497 /*
498 * Archive streamer is reading a non-WAL file or an irrelevant WAL
499 * file.
500 */
501 if (entry == NULL)
502 continue;
503
504 /*
505 * The streamer is producing a WAL segment that isn't the one asked
506 * for; it must be arriving out of order. Spill its data to disk so
507 * it can be read back when needed.
508 */
509 Assert(strcmp(fname, entry->fname) != 0);
510
511 /* Create a temporary file if one does not already exist */
512 if (!entry->spilled)
513 {
514 write_fp = prepare_tmp_write(entry->fname, privateInfo);
515 entry->spilled = true;
516 }
517
518 /* Flush data from the buffer to the file */
519 perform_tmp_write(entry->fname, entry->buf, write_fp);
520 resetStringInfo(entry->buf);
521
522 /*
523 * If cur_file changed since we captured entry above, the archive
524 * streamer has finished this segment and moved on. Close its spill
525 * file handle so data is flushed to disk before the next segment
526 * starts writing to a different handle.
527 */
528 if (entry != privateInfo->cur_file && write_fp != NULL)
529 {
531 write_fp = NULL;
532 }
533 }
534
535 /* Requested WAL segment not found */
536 pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
537 fname, privateInfo->archive_name);
538}
static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
#define READ_CHUNK_SIZE
static int read_archive_file(XLogDumpPrivate *privateInfo, Size count)
static FILE * prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
const char * fname

References XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_wal_htab, Assert, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, fb(), ArchivedWALFile::fname, StringInfoData::len, perform_tmp_write(), pg_fatal, prepare_tmp_write(), read_archive_file(), READ_CHUNK_SIZE, resetStringInfo(), and ArchivedWALFile::spilled.

Referenced by read_archive_wal_page().

◆ hash_string_pointer()

static uint32 hash_string_pointer ( const char s)
static

Definition at line 855 of file archive_waldump.c.

856{
857 unsigned char *ss = (unsigned char *) s;
858
859 return hash_bytes(ss, strlen(s));
860}
uint32 hash_bytes(const unsigned char *k, int keylen)
Definition hashfn.c:146

References fb(), and hash_bytes().

◆ init_archive_reader()

void init_archive_reader ( XLogDumpPrivate privateInfo,
pg_compress_algorithm  compression 
)

Definition at line 124 of file archive_waldump.c.

126{
127 int fd;
128 astreamer *streamer;
129 ArchivedWALFile *entry = NULL;
131 XLogSegNo segno;
132 TimeLineID timeline;
133
134 /* Open tar archive and store its file descriptor */
136 privateInfo->archive_name);
137
138 if (fd < 0)
139 pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
140
141 privateInfo->archive_fd = fd;
142
143 streamer = astreamer_waldump_new(privateInfo);
144
145 /* We must first parse the tar archive. */
146 streamer = astreamer_tar_parser_new(streamer);
147
148 /* If the archive is compressed, decompress before parsing. */
149 if (compression == PG_COMPRESSION_GZIP)
150 streamer = astreamer_gzip_decompressor_new(streamer);
151 else if (compression == PG_COMPRESSION_LZ4)
152 streamer = astreamer_lz4_decompressor_new(streamer);
153 else if (compression == PG_COMPRESSION_ZSTD)
154 streamer = astreamer_zstd_decompressor_new(streamer);
155
156 privateInfo->archive_streamer = streamer;
157
158 /*
159 * Allocate a buffer for reading the archive file to facilitate content
160 * decoding; read requests must not exceed the allocated buffer size.
161 */
163
164#ifdef USE_ASSERT_CHECKING
165 privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
166#endif
167
168 /*
169 * Hash table storing WAL entries read from the archive with an arbitrary
170 * initial size.
171 */
172 privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
173
174 /*
175 * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
176 * the first WAL segment in the archive so we can extract the WAL segment
177 * size from the long page header.
178 */
179 while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
180 {
181 if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
182 pg_fatal("could not find WAL in archive \"%s\"",
183 privateInfo->archive_name);
184
185 entry = privateInfo->cur_file;
186 }
187
188 /* Extract the WAL segment size from the long page header */
190
191 if (!IsValidWalSegSize(longhdr->xlp_seg_size))
192 {
193 pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
194 "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
195 longhdr->xlp_seg_size),
196 privateInfo->archive_name, longhdr->xlp_seg_size);
197 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
198 exit(1);
199 }
200
201 privateInfo->segsize = longhdr->xlp_seg_size;
202
203 /*
204 * With the WAL segment size available, we can now initialize the
205 * dependent start and end segment numbers.
206 */
207 Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
208 XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
209 privateInfo->segsize);
210
211 if (!XLogRecPtrIsInvalid(privateInfo->endptr))
212 XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
213 privateInfo->segsize);
214
215 /*
216 * This WAL record was fetched before the filtering parameters
217 * (start_segno and end_segno) were fully initialized. Perform the
218 * relevance check against the user-provided range now; if the WAL falls
219 * outside this range, remove it from the hash table. Subsequent WAL will
220 * be filtered automatically by the archive streamer using the updated
221 * start_segno and end_segno values.
222 */
223 XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
224 if (privateInfo->timeline != timeline ||
225 privateInfo->start_segno > segno ||
226 privateInfo->end_segno < segno)
227 free_archive_wal_entry(entry->fname, privateInfo);
228}
static astreamer * astreamer_waldump_new(XLogDumpPrivate *privateInfo)
void free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
astreamer * astreamer_tar_parser_new(astreamer *next)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
#define ngettext(s, p, n)
Definition c.h:1272
@ PG_COMPRESSION_GZIP
Definition compression.h:24
@ PG_COMPRESSION_LZ4
Definition compression.h:25
@ PG_COMPRESSION_ZSTD
Definition compression.h:26
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
#define pg_log_error_detail(...)
Definition logging.h:109
int open_file_in_directory(const char *directory, const char *fname)
Definition pg_waldump.c:180
static int fd(const char *x, int i)
XLogRecPtr endptr
Definition pg_waldump.h:30
XLogRecPtr startptr
Definition pg_waldump.h:29
char * archive_dir
Definition pg_waldump.h:35
#define IsValidWalSegSize(size)
XLogLongPageHeaderData * XLogLongPageHeader
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsInvalid(r)
Definition xlogdefs.h:30

References XLogDumpPrivate::archive_dir, XLogDumpPrivate::archive_fd, XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_read_buf, XLogDumpPrivate::archive_streamer, XLogDumpPrivate::archive_wal_htab, Assert, astreamer_gzip_decompressor_new(), astreamer_lz4_decompressor_new(), astreamer_tar_parser_new(), astreamer_waldump_new(), astreamer_zstd_decompressor_new(), ArchivedWALFile::buf, XLogDumpPrivate::cur_file, StringInfoData::data, XLogDumpPrivate::end_segno, XLogDumpPrivate::endptr, fb(), fd(), ArchivedWALFile::fname, free_archive_wal_entry(), IsValidWalSegSize, StringInfoData::len, ngettext, open_file_in_directory(), PG_COMPRESSION_GZIP, PG_COMPRESSION_LZ4, PG_COMPRESSION_ZSTD, pg_fatal, pg_log_error, pg_log_error_detail, pg_malloc(), read_archive_file(), READ_CHUNK_SIZE, XLogDumpPrivate::segsize, XLogDumpPrivate::start_segno, XLogDumpPrivate::startptr, XLogDumpPrivate::timeline, XLByteToSeg, XLogFromFileName(), and XLogRecPtrIsInvalid.

Referenced by main().

◆ member_is_wal_file()

static bool member_is_wal_file ( astreamer_waldump mystreamer,
astreamer_member member,
char **  fname 
)
static

Definition at line 812 of file archive_waldump.c.

814{
815 int pathlen;
816 char pathname[MAXPGPATH];
817 char *filename;
818
819 /* We are only interested in normal files */
820 if (member->is_directory || member->is_link)
821 return false;
822
823 if (strlen(member->pathname) < XLOG_FNAME_LEN)
824 return false;
825
826 /*
827 * For a correct comparison, we must remove any '.' or '..' components
828 * from the member pathname. Similar to member_verify_header(), we prepend
829 * './' to the path so that canonicalize_path() can properly resolve and
830 * strip these references from the tar member name.
831 */
832 snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
833 canonicalize_path(pathname);
834 pathlen = strlen(pathname);
835
836 /* Skip files in subdirectories other than pg_wal/ */
837 if (pathlen > XLOG_FNAME_LEN &&
838 strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
839 return false;
840
841 /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
842 filename = pathname + (pathlen - XLOG_FNAME_LEN);
844 return false;
845
847
848 return true;
849}
char * pnstrdup(const char *in, Size len)
Definition mcxt.c:1792
static char * filename
Definition pg_dumpall.c:133
void canonicalize_path(char *path)
Definition path.c:337
static bool IsXLogFileName(const char *fname)
#define XLOGDIR
#define XLOG_FNAME_LEN

References canonicalize_path(), fb(), filename, astreamer_member::is_directory, astreamer_member::is_link, IsXLogFileName(), MAXPGPATH, astreamer_member::pathname, pnstrdup(), snprintf, XLOG_FNAME_LEN, and XLOGDIR.

Referenced by astreamer_waldump_content().

◆ perform_tmp_write()

static void perform_tmp_write ( const char fname,
StringInfo  buf,
FILE file 
)
static

Definition at line 653 of file archive_waldump.c.

654{
655 Assert(file);
656
657 errno = 0;
658 if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
659 {
660 /*
661 * If write didn't set errno, assume problem is no disk space
662 */
663 if (errno == 0)
664 errno = ENOSPC;
665 pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
666 }
667}
static char buf[DEFAULT_XLOG_SEG_SIZE]

References Assert, buf, fb(), pg_fatal, and TmpWalSegDir.

Referenced by get_archive_wal_entry().

◆ prepare_tmp_write()

static FILE * prepare_tmp_write ( const char fname,
XLogDumpPrivate privateInfo 
)
static

Definition at line 616 of file archive_waldump.c.

617{
618 char fpath[MAXPGPATH];
619 FILE *file;
620
621 /*
622 * Setup temporary directory to store WAL segments and set up an exit
623 * callback to remove it upon completion if not already.
624 */
625 if (unlikely(TmpWalSegDir == NULL))
626 {
627 setup_tmpwal_dir(privateInfo->archive_dir);
629 }
630
631 snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
632
633 /* Open the spill file for writing */
634 file = fopen(fpath, PG_BINARY_W);
635 if (file == NULL)
636 pg_fatal("could not create file \"%s\": %m", fpath);
637
638#ifndef WIN32
640 pg_fatal("could not set permissions on file \"%s\": %m",
641 fpath);
642#endif
643
644 pg_log_debug("spilling to temporary file \"%s\"", fpath);
645
646 return file;
647}
static void cleanup_tmpwal_dir_atexit(void)
static void setup_tmpwal_dir(const char *waldir)
#define unlikely(x)
Definition c.h:432
#define PG_BINARY_W
Definition c.h:1379
int pg_file_create_mode
Definition file_perm.c:19

References XLogDumpPrivate::archive_dir, cleanup_tmpwal_dir_atexit(), fb(), MAXPGPATH, PG_BINARY_W, pg_fatal, pg_file_create_mode, pg_log_debug, setup_tmpwal_dir(), snprintf, TmpWalSegDir, and unlikely.

Referenced by get_archive_wal_entry().

◆ read_archive_file()

static int read_archive_file ( XLogDumpPrivate privateInfo,
Size  count 
)
static

Definition at line 545 of file archive_waldump.c.

546{
547 int rc;
548
549 /* The read request must not exceed the allocated buffer size. */
550 Assert(privateInfo->archive_read_buf_size >= count);
551
552 rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
553 if (rc < 0)
554 pg_fatal("could not read file \"%s\": %m",
555 privateInfo->archive_name);
556
557 /*
558 * Decompress (if required), and then parse the previously read contents
559 * of the tar file.
560 */
561 if (rc > 0)
563 privateInfo->archive_read_buf, rc,
565
566 return rc;
567}
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:135
#define read(a, b, c)
Definition win32.h:13

References XLogDumpPrivate::archive_fd, XLogDumpPrivate::archive_name, XLogDumpPrivate::archive_read_buf, XLogDumpPrivate::archive_streamer, Assert, astreamer_content(), ASTREAMER_UNKNOWN, fb(), pg_fatal, and read.

Referenced by get_archive_wal_entry(), init_archive_reader(), and read_archive_wal_page().

◆ read_archive_wal_page()

int read_archive_wal_page ( XLogDumpPrivate privateInfo,
XLogRecPtr  targetPagePtr,
Size  count,
char readBuff 
)

Definition at line 281 of file archive_waldump.c.

283{
284 char *p = readBuff;
285 Size nbytes = count;
287 int segsize = privateInfo->segsize;
288 XLogSegNo segno;
289 char fname[MAXFNAMELEN];
290 ArchivedWALFile *entry;
291
292 /* Identify the segment and locate its entry in the archive hash */
293 XLByteToSeg(targetPagePtr, segno, segsize);
294 XLogFileName(fname, privateInfo->timeline, segno, segsize);
295 entry = get_archive_wal_entry(fname, privateInfo);
296
297 while (nbytes > 0)
298 {
299 char *buf = entry->buf->data;
300 int bufLen = entry->buf->len;
303
304 /*
305 * Calculate the LSN range currently residing in the buffer.
306 *
307 * read_len tracks total bytes received for this segment (including
308 * already-discarded data), so endPtr is the LSN just past the last
309 * buffered byte, and startPtr is the LSN of the first buffered byte.
310 */
311 XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
313
314 /*
315 * Copy the requested WAL record if it exists in the buffer.
316 */
317 if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
318 {
319 int copyBytes;
320 int offset = recptr - startPtr;
321
322 /*
323 * Given startPtr <= recptr < endPtr and a total buffer size
324 * 'bufLen', the offset (recptr - startPtr) will always be less
325 * than 'bufLen'.
326 */
327 Assert(offset < bufLen);
328
329 copyBytes = Min(nbytes, bufLen - offset);
330 memcpy(p, buf + offset, copyBytes);
331
332 /* Update state for read */
333 recptr += copyBytes;
334 nbytes -= copyBytes;
335 p += copyBytes;
336 }
337 else
338 {
339 /*
340 * Before starting the actual decoding loop, pg_waldump tries to
341 * locate the first valid record from the user-specified start
342 * position, which might not be the start of a WAL record and
343 * could fall in the middle of a record that spans multiple pages.
344 * Consequently, the valid start position the decoder is looking
345 * for could be far away from that initial position.
346 *
347 * This may involve reading across multiple pages, and this
348 * pre-reading fetches data in multiple rounds from the archive
349 * streamer; normally, we would throw away existing buffer
350 * contents to fetch the next set of data, but that existing data
351 * might be needed once the main loop starts. Because previously
352 * read data cannot be re-read by the archive streamer, we delay
353 * resetting the buffer until the main decoding loop is entered.
354 *
355 * Once pg_waldump has entered the main loop, it may re-read the
356 * currently active page, but never an older one; therefore, any
357 * fully consumed WAL data preceding the current page can then be
358 * safely discarded.
359 */
360 if (privateInfo->decoding_started)
361 {
362 resetStringInfo(entry->buf);
363
364 /*
365 * Push back the partial page data for the current page to the
366 * buffer, ensuring a full page remains available for
367 * re-reading if requested.
368 */
369 if (p > readBuff)
370 {
371 Assert((count - nbytes) > 0);
372 appendBinaryStringInfo(entry->buf, readBuff, count - nbytes);
373 }
374 }
375
376 /*
377 * Now, fetch more data. Raise an error if the archive streamer
378 * has moved past our segment (meaning the WAL file in the archive
379 * is shorter than expected) or if reading the archive reached
380 * EOF.
381 */
382 if (privateInfo->cur_file != entry)
383 pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %lld of %lld bytes",
384 fname, privateInfo->archive_name,
385 (long long int) (count - nbytes),
386 (long long int) count);
387 if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
388 pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
389 privateInfo->archive_name, fname,
390 (long long int) (count - nbytes),
391 (long long int) count);
392 }
393 }
394
395 /*
396 * Should have successfully read all the requested bytes or reported a
397 * failure before this point.
398 */
399 Assert(nbytes == 0);
400
401 /*
402 * Return count unchanged; the caller expects this convention, matching
403 * the routine that reads WAL pages from physical files.
404 */
405 return count;
406}
static ArchivedWALFile * get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
#define Min(x, y)
Definition c.h:1093
size_t Size
Definition c.h:691
bool decoding_started
Definition pg_waldump.h:32
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define MAXFNAMELEN
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
uint64 XLogRecPtr
Definition xlogdefs.h:21

References appendBinaryStringInfo(), XLogDumpPrivate::archive_name, Assert, buf, ArchivedWALFile::buf, XLogDumpPrivate::cur_file, StringInfoData::data, XLogDumpPrivate::decoding_started, fb(), get_archive_wal_entry(), StringInfoData::len, MAXFNAMELEN, Min, pg_fatal, read_archive_file(), READ_CHUNK_SIZE, ArchivedWALFile::read_len, resetStringInfo(), XLogDumpPrivate::segsize, XLogDumpPrivate::timeline, XLByteToSeg, XLogFileName(), and XLogSegNoOffsetToRecPtr.

Referenced by TarWALDumpReadPage().

◆ setup_tmpwal_dir()

static void setup_tmpwal_dir ( const char waldir)
static

Definition at line 573 of file archive_waldump.c.

574{
575 const char *tmpdir = getenv("TMPDIR");
576 char *template;
577
579
580 /*
581 * Use the directory specified by the TMPDIR environment variable. If it's
582 * not set, fall back to the provided WAL directory to store WAL files
583 * temporarily.
584 */
585 template = psprintf("%s/waldump_tmp-XXXXXX",
586 tmpdir ? tmpdir : waldir);
587 TmpWalSegDir = mkdtemp(template);
588
589 if (TmpWalSegDir == NULL)
590 pg_fatal("could not create directory \"%s\": %m", template);
591
593
594 pg_log_debug("created directory \"%s\"", TmpWalSegDir);
595}
char * mkdtemp(char *path)
Definition mkdtemp.c:286
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References Assert, canonicalize_path(), fb(), mkdtemp(), pg_fatal, pg_log_debug, psprintf(), and TmpWalSegDir.

Referenced by prepare_tmp_write().

Variable Documentation

◆ astreamer_waldump_ops

const astreamer_ops astreamer_waldump_ops
static
Initial value:
= {
}
static void astreamer_waldump_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_waldump_free(astreamer *streamer)
static void astreamer_waldump_finalize(astreamer *streamer)

Definition at line 111 of file archive_waldump.c.

111 {
112 .content = astreamer_waldump_content,
113 .finalize = astreamer_waldump_finalize,
115};

Referenced by astreamer_waldump_new().

◆ TmpWalSegDir