PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsummarizer.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * walsummarizer.c
4 *
5 * Background process to perform WAL summarization, if it is enabled.
6 * It continuously scans the write-ahead log and periodically emits a
7 * summary file which indicates which blocks in which relation forks
8 * were modified by WAL records in the LSN range covered by the summary
9 * file. See walsummary.c and blkreftable.c for more details on the
10 * naming and contents of WAL summary files.
11 *
12 * If configured to do, this background process will also remove WAL
13 * summary files when the file timestamp is older than a configurable
14 * threshold (but only if the WAL has been removed first).
15 *
16 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
17 *
18 * IDENTIFICATION
19 * src/backend/postmaster/walsummarizer.c
20 *
21 *-------------------------------------------------------------------------
22 */
23#include "postgres.h"
24
25#include "access/timeline.h"
26#include "access/xlog.h"
28#include "access/xlogrecovery.h"
29#include "access/xlogutils.h"
30#include "backup/walsummary.h"
33#include "common/blkreftable.h"
34#include "libpq/pqsignal.h"
35#include "miscadmin.h"
36#include "pgstat.h"
41#include "storage/aio_subsys.h"
42#include "storage/fd.h"
43#include "storage/ipc.h"
44#include "storage/latch.h"
45#include "storage/lwlock.h"
46#include "storage/proc.h"
47#include "storage/procsignal.h"
48#include "storage/shmem.h"
49#include "utils/guc.h"
50#include "utils/memutils.h"
51#include "utils/wait_event.h"
52
53/*
54 * Data in shared memory related to WAL summarization.
55 */
56typedef struct
57{
58 /*
59 * These fields are protected by WALSummarizerLock.
60 *
61 * Until we've discovered what summary files already exist on disk and
62 * stored that information in shared memory, initialized is false and the
63 * other fields here contain no meaningful information. After that has
64 * been done, initialized is true.
65 *
66 * summarized_tli and summarized_lsn indicate the last LSN and TLI at
67 * which the next summary file will start. Normally, these are the LSN and
68 * TLI at which the last file ended; in such case, lsn_is_exact is true.
69 * If, however, the LSN is just an approximation, then lsn_is_exact is
70 * false. This can happen if, for example, there are no existing WAL
71 * summary files at startup. In that case, we have to derive the position
72 * at which to start summarizing from the WAL files that exist on disk,
73 * and so the LSN might point to the start of the next file even though
74 * that might happen to be in the middle of a WAL record.
75 *
76 * summarizer_pgprocno is the proc number of the summarizer process, if
77 * one is running, or else INVALID_PROC_NUMBER.
78 *
79 * pending_lsn is used by the summarizer to advertise the ending LSN of a
80 * record it has recently read. It shouldn't ever be less than
81 * summarized_lsn, but might be greater, because the summarizer buffers
82 * data for a range of LSNs in memory before writing out a new file.
83 */
90
91 /*
92 * This field handles its own synchronization.
93 */
96
97/*
98 * Private data for our xlogreader's page read callback.
99 */
100typedef struct
101{
107
108/* Pointer to shared memory state. */
110
111/*
112 * When we reach end of WAL and need to read more, we sleep for a number of
113 * milliseconds that is an integer multiple of MS_PER_SLEEP_QUANTUM. This is
114 * the multiplier. It should vary between 1 and MAX_SLEEP_QUANTA, depending
115 * on system activity. See summarizer_wait_for_wal() for how we adjust this.
116 */
117static long sleep_quanta = 1;
118
119/*
120 * The sleep time will always be a multiple of 200ms and will not exceed
121 * thirty seconds (150 * 200 = 30 * 1000). Note that the timeout here needs
122 * to be substantially less than the maximum amount of time for which an
123 * incremental backup will wait for this process to catch up. Otherwise, an
124 * incremental backup might time out on an idle system just because we sleep
125 * for too long.
126 */
127#define MAX_SLEEP_QUANTA 150
128#define MS_PER_SLEEP_QUANTUM 200
129
130/*
131 * This is a count of the number of pages of WAL that we've read since the
132 * last time we waited for more WAL to appear.
133 */
135
136/*
137 * Most recent RedoRecPtr value observed by MaybeRemoveOldWalSummaries.
138 */
140
141/*
142 * GUC parameters
143 */
144bool summarize_wal = false;
146
147static void WalSummarizerShutdown(int code, Datum arg);
149static void ProcessWalSummarizerInterrupts(void);
150static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn,
151 bool exact, XLogRecPtr switch_lsn,
152 XLogRecPtr maximum_lsn);
154 BlockRefTable *brtab);
156 BlockRefTable *brtab);
158 BlockRefTable *brtab);
160 bool *new_fast_forward);
162 XLogRecPtr targetPagePtr,
163 int reqLen,
164 XLogRecPtr targetRecPtr,
165 char *cur_page);
166static void summarizer_wait_for_wal(void);
167static void MaybeRemoveOldWalSummaries(void);
168
169/*
170 * Amount of shared memory required for this module.
171 */
172Size
174{
175 return sizeof(WalSummarizerData);
176}
177
178/*
179 * Create or attach to shared memory segment for this module.
180 */
181void
183{
184 bool found;
185
187 ShmemInitStruct("Wal Summarizer Ctl", WalSummarizerShmemSize(),
188 &found);
189
190 if (!found)
191 {
192 /*
193 * First time through, so initialize.
194 *
195 * We're just filling in dummy values here -- the real initialization
196 * will happen when GetOldestUnsummarizedLSN() is called for the first
197 * time.
198 */
206 }
207}
208
209/*
210 * Entry point for walsummarizer process.
211 */
212void
213WalSummarizerMain(const void *startup_data, size_t startup_data_len)
214{
215 sigjmp_buf local_sigjmp_buf;
216 MemoryContext context;
217
218 /*
219 * Within this function, 'current_lsn' and 'current_tli' refer to the
220 * point from which the next WAL summary file should start. 'exact' is
221 * true if 'current_lsn' is known to be the start of a WAL record or WAL
222 * segment, and false if it might be in the middle of a record someplace.
223 *
224 * 'switch_lsn' and 'switch_tli', if set, are the LSN at which we need to
225 * switch to a new timeline and the timeline to which we need to switch.
226 * If not set, we either haven't figured out the answers yet or we're
227 * already on the latest timeline.
228 */
229 XLogRecPtr current_lsn;
230 TimeLineID current_tli;
231 bool exact;
232 XLogRecPtr switch_lsn = InvalidXLogRecPtr;
233 TimeLineID switch_tli = 0;
234
235 Assert(startup_data_len == 0);
236
239
241 (errmsg_internal("WAL summarizer started")));
242
243 /*
244 * Properly accept or ignore signals the postmaster might send us
245 *
246 * We have no particular use for SIGINT at the moment, but seems
247 * reasonable to treat like SIGTERM.
248 */
252 /* SIGQUIT handler was already set up by InitPostmasterChild */
253 pqsignal(SIGALRM, SIG_IGN);
254 pqsignal(SIGPIPE, SIG_IGN);
256 pqsignal(SIGUSR2, SIG_IGN); /* not used */
257
258 /* Advertise ourselves. */
260 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
262 LWLockRelease(WALSummarizerLock);
263
264 /* Create and switch to a memory context that we can reset on error. */
266 "Wal Summarizer",
268 MemoryContextSwitchTo(context);
269
270 /*
271 * Reset some signals that are accepted by postmaster but not here
272 */
273 pqsignal(SIGCHLD, SIG_DFL);
274
275 /*
276 * If an exception is encountered, processing resumes here.
277 */
278 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
279 {
280 /* Since not using PG_TRY, must reset error stack by hand */
281 error_context_stack = NULL;
282
283 /* Prevent interrupts while cleaning up */
285
286 /* Report the error to the server log */
288
289 /* Release resources we might have acquired. */
295 AtEOXact_Files(false);
296 AtEOXact_HashTables(false);
297
298 /*
299 * Now return to normal top-level context and clear ErrorContext for
300 * next time.
301 */
302 MemoryContextSwitchTo(context);
304
305 /* Flush any leaked data in the top-level context */
306 MemoryContextReset(context);
307
308 /* Now we can allow interrupts again */
310
311 /*
312 * Sleep for 10 seconds before attempting to resume operations in
313 * order to avoid excessive logging.
314 *
315 * Many of the likely error conditions are things that will repeat
316 * every time. For example, if the WAL can't be read or the summary
317 * can't be written, only administrator action will cure the problem.
318 * So a really fast retry time doesn't seem to be especially
319 * beneficial, and it will clutter the logs.
320 */
321 (void) WaitLatch(NULL,
323 10000,
324 WAIT_EVENT_WAL_SUMMARIZER_ERROR);
325 }
326
327 /* We can now handle ereport(ERROR) */
328 PG_exception_stack = &local_sigjmp_buf;
329
330 /*
331 * Unblock signals (they were blocked when the postmaster forked us)
332 */
333 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
334
335 /*
336 * Fetch information about previous progress from shared memory, and ask
337 * GetOldestUnsummarizedLSN to reset pending_lsn to summarized_lsn. We
338 * might be recovering from an error, and if so, pending_lsn might have
339 * advanced past summarized_lsn, but any WAL we read previously has been
340 * lost and will need to be reread.
341 *
342 * If we discover that WAL summarization is not enabled, just exit.
343 */
344 current_lsn = GetOldestUnsummarizedLSN(&current_tli, &exact);
345 if (XLogRecPtrIsInvalid(current_lsn))
346 proc_exit(0);
347
348 /*
349 * Loop forever
350 */
351 for (;;)
352 {
353 XLogRecPtr latest_lsn;
354 TimeLineID latest_tli;
355 XLogRecPtr end_of_summary_lsn;
356
357 /* Flush any leaked data in the top-level context */
358 MemoryContextReset(context);
359
360 /* Process any signals received recently. */
362
363 /* If it's time to remove any old WAL summaries, do that now. */
365
366 /* Find the LSN and TLI up to which we can safely summarize. */
367 latest_lsn = GetLatestLSN(&latest_tli);
368
369 /*
370 * If we're summarizing a historic timeline and we haven't yet
371 * computed the point at which to switch to the next timeline, do that
372 * now.
373 *
374 * Note that if this is a standby, what was previously the current
375 * timeline could become historic at any time.
376 *
377 * We could try to make this more efficient by caching the results of
378 * readTimeLineHistory when latest_tli has not changed, but since we
379 * only have to do this once per timeline switch, we probably wouldn't
380 * save any significant amount of work in practice.
381 */
382 if (current_tli != latest_tli && XLogRecPtrIsInvalid(switch_lsn))
383 {
384 List *tles = readTimeLineHistory(latest_tli);
385
386 switch_lsn = tliSwitchPoint(current_tli, tles, &switch_tli);
388 errmsg_internal("switch point from TLI %u to TLI %u is at %X/%X",
389 current_tli, switch_tli, LSN_FORMAT_ARGS(switch_lsn)));
390 }
391
392 /*
393 * If we've reached the switch LSN, we can't summarize anything else
394 * on this timeline. Switch to the next timeline and go around again,
395 * backing up to the exact switch point if we passed it.
396 */
397 if (!XLogRecPtrIsInvalid(switch_lsn) && current_lsn >= switch_lsn)
398 {
399 /* Restart summarization from switch point. */
400 current_tli = switch_tli;
401 current_lsn = switch_lsn;
402
403 /* Next timeline and switch point, if any, not yet known. */
404 switch_lsn = InvalidXLogRecPtr;
405 switch_tli = 0;
406
407 /* Update (really, rewind, if needed) state in shared memory. */
408 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
409 WalSummarizerCtl->summarized_lsn = current_lsn;
410 WalSummarizerCtl->summarized_tli = current_tli;
412 WalSummarizerCtl->pending_lsn = current_lsn;
413 LWLockRelease(WALSummarizerLock);
414
415 continue;
416 }
417
418 /* Summarize WAL. */
419 end_of_summary_lsn = SummarizeWAL(current_tli,
420 current_lsn, exact,
421 switch_lsn, latest_lsn);
422 Assert(!XLogRecPtrIsInvalid(end_of_summary_lsn));
423 Assert(end_of_summary_lsn >= current_lsn);
424
425 /*
426 * Update state for next loop iteration.
427 *
428 * Next summary file should start from exactly where this one ended.
429 */
430 current_lsn = end_of_summary_lsn;
431 exact = true;
432
433 /* Update state in shared memory. */
434 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
435 WalSummarizerCtl->summarized_lsn = end_of_summary_lsn;
436 WalSummarizerCtl->summarized_tli = current_tli;
438 WalSummarizerCtl->pending_lsn = end_of_summary_lsn;
439 LWLockRelease(WALSummarizerLock);
440
441 /* Wake up anyone waiting for more summary files to be written. */
443 }
444}
445
446/*
447 * Get information about the state of the WAL summarizer.
448 */
449void
450GetWalSummarizerState(TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn,
451 XLogRecPtr *pending_lsn, int *summarizer_pid)
452{
453 LWLockAcquire(WALSummarizerLock, LW_SHARED);
455 {
456 /*
457 * If initialized is false, the rest of the structure contents are
458 * undefined.
459 */
460 *summarized_tli = 0;
461 *summarized_lsn = InvalidXLogRecPtr;
462 *pending_lsn = InvalidXLogRecPtr;
463 *summarizer_pid = -1;
464 }
465 else
466 {
467 int summarizer_pgprocno = WalSummarizerCtl->summarizer_pgprocno;
468
469 *summarized_tli = WalSummarizerCtl->summarized_tli;
470 *summarized_lsn = WalSummarizerCtl->summarized_lsn;
471 if (summarizer_pgprocno == INVALID_PROC_NUMBER)
472 {
473 /*
474 * If the summarizer has exited, the fact that it had processed
475 * beyond summarized_lsn is irrelevant now.
476 */
477 *pending_lsn = WalSummarizerCtl->summarized_lsn;
478 *summarizer_pid = -1;
479 }
480 else
481 {
482 *pending_lsn = WalSummarizerCtl->pending_lsn;
483
484 /*
485 * We're not fussed about inexact answers here, since they could
486 * become stale instantly, so we don't bother taking the lock, but
487 * make sure that invalid PID values are normalized to -1.
488 */
489 *summarizer_pid = GetPGProcByNumber(summarizer_pgprocno)->pid;
490 if (*summarizer_pid <= 0)
491 *summarizer_pid = -1;
492 }
493 }
494 LWLockRelease(WALSummarizerLock);
495}
496
497/*
498 * Get the oldest LSN in this server's timeline history that has not yet been
499 * summarized, and update shared memory state as appropriate.
500 *
501 * If *tli != NULL, it will be set to the TLI for the LSN that is returned.
502 *
503 * If *lsn_is_exact != NULL, it will be set to true if the returned LSN is
504 * necessarily the start of a WAL record and false if it's just the beginning
505 * of a WAL segment.
506 */
508GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact)
509{
510 TimeLineID latest_tli;
511 int n;
512 List *tles;
513 XLogRecPtr unsummarized_lsn = InvalidXLogRecPtr;
514 TimeLineID unsummarized_tli = 0;
515 bool should_make_exact = false;
516 List *existing_summaries;
517 ListCell *lc;
518 bool am_wal_summarizer = AmWalSummarizerProcess();
519
520 /* If not summarizing WAL, do nothing. */
521 if (!summarize_wal)
522 return InvalidXLogRecPtr;
523
524 /*
525 * If we are not the WAL summarizer process, then we normally just want to
526 * read the values from shared memory. However, as an exception, if shared
527 * memory hasn't been initialized yet, then we need to do that so that we
528 * can read legal values and not remove any WAL too early.
529 */
530 if (!am_wal_summarizer)
531 {
532 LWLockAcquire(WALSummarizerLock, LW_SHARED);
533
535 {
536 unsummarized_lsn = WalSummarizerCtl->summarized_lsn;
537 if (tli != NULL)
539 if (lsn_is_exact != NULL)
540 *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
541 LWLockRelease(WALSummarizerLock);
542 return unsummarized_lsn;
543 }
544
545 LWLockRelease(WALSummarizerLock);
546 }
547
548 /*
549 * Find the oldest timeline on which WAL still exists, and the earliest
550 * segment for which it exists.
551 *
552 * Note that we do this every time the WAL summarizer process restarts or
553 * recovers from an error, in case the contents of pg_wal have changed
554 * under us e.g. if some files were removed, either manually - which
555 * shouldn't really happen, but might - or by postgres itself, if
556 * summarize_wal was turned off and then back on again.
557 */
558 (void) GetLatestLSN(&latest_tli);
559 tles = readTimeLineHistory(latest_tli);
560 for (n = list_length(tles) - 1; n >= 0; --n)
561 {
562 TimeLineHistoryEntry *tle = list_nth(tles, n);
563 XLogSegNo oldest_segno;
564
565 oldest_segno = XLogGetOldestSegno(tle->tli);
566 if (oldest_segno != 0)
567 {
568 /* Compute oldest LSN that still exists on disk. */
570 unsummarized_lsn);
571
572 unsummarized_tli = tle->tli;
573 break;
574 }
575 }
576
577 /*
578 * Don't try to summarize anything older than the end LSN of the newest
579 * summary file that exists for this timeline.
580 */
581 existing_summaries =
582 GetWalSummaries(unsummarized_tli,
584 foreach(lc, existing_summaries)
585 {
586 WalSummaryFile *ws = lfirst(lc);
587
588 if (ws->end_lsn > unsummarized_lsn)
589 {
590 unsummarized_lsn = ws->end_lsn;
591 should_make_exact = true;
592 }
593 }
594
595 /* It really should not be possible for us to find no WAL. */
596 if (unsummarized_tli == 0)
598 errcode(ERRCODE_INTERNAL_ERROR),
599 errmsg_internal("no WAL found on timeline %u", latest_tli));
600
601 /*
602 * If we're the WAL summarizer, we always want to store the values we just
603 * computed into shared memory, because those are the values we're going
604 * to use to drive our operation, and so they are the authoritative
605 * values. Otherwise, we only store values into shared memory if shared
606 * memory is uninitialized. Our values are not canonical in such a case,
607 * but it's better to have something than nothing, to guide WAL retention.
608 */
609 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
610 if (am_wal_summarizer || !WalSummarizerCtl->initialized)
611 {
613 WalSummarizerCtl->summarized_lsn = unsummarized_lsn;
614 WalSummarizerCtl->summarized_tli = unsummarized_tli;
615 WalSummarizerCtl->lsn_is_exact = should_make_exact;
616 WalSummarizerCtl->pending_lsn = unsummarized_lsn;
617 }
618 else
619 unsummarized_lsn = WalSummarizerCtl->summarized_lsn;
620
621 /* Also return the to the caller as required. */
622 if (tli != NULL)
624 if (lsn_is_exact != NULL)
625 *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
626 LWLockRelease(WALSummarizerLock);
627
628 return unsummarized_lsn;
629}
630
631/*
632 * Wake up the WAL summarizer process.
633 *
634 * This might not work, because there's no guarantee that the WAL summarizer
635 * process was successfully started, and it also might have started but
636 * subsequently terminated. So, under normal circumstances, this will get the
637 * latch set, but there's no guarantee.
638 */
639void
641{
642 ProcNumber pgprocno;
643
644 if (WalSummarizerCtl == NULL)
645 return;
646
647 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
649 LWLockRelease(WALSummarizerLock);
650
651 if (pgprocno != INVALID_PROC_NUMBER)
653}
654
655/*
656 * Wait until WAL summarization reaches the given LSN, but time out with an
657 * error if the summarizer seems to be stick.
658 *
659 * Returns immediately if summarize_wal is turned off while we wait. Caller
660 * is expected to handle this case, if necessary.
661 */
662void
664{
665 TimestampTz initial_time,
666 cycle_time,
667 current_time;
668 XLogRecPtr prior_pending_lsn = InvalidXLogRecPtr;
669 int deadcycles = 0;
670
671 initial_time = cycle_time = GetCurrentTimestamp();
672
673 while (1)
674 {
675 long timeout_in_ms = 10000;
676 XLogRecPtr summarized_lsn;
677 XLogRecPtr pending_lsn;
678
680
681 /* If WAL summarization is disabled while we're waiting, give up. */
682 if (!summarize_wal)
683 return;
684
685 /*
686 * If the LSN summarized on disk has reached the target value, stop.
687 */
688 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
689 summarized_lsn = WalSummarizerCtl->summarized_lsn;
690 pending_lsn = WalSummarizerCtl->pending_lsn;
691 LWLockRelease(WALSummarizerLock);
692
693 /* If WAL summarization has progressed sufficiently, stop waiting. */
694 if (summarized_lsn >= lsn)
695 break;
696
697 /* Recheck current time. */
698 current_time = GetCurrentTimestamp();
699
700 /* Have we finished the current cycle of waiting? */
701 if (TimestampDifferenceMilliseconds(cycle_time,
702 current_time) >= timeout_in_ms)
703 {
704 long elapsed_seconds;
705
706 /* Begin new wait cycle. */
707 cycle_time = TimestampTzPlusMilliseconds(cycle_time,
708 timeout_in_ms);
709
710 /*
711 * Keep track of the number of cycles during which there has been
712 * no progression of pending_lsn. If pending_lsn is not advancing,
713 * that means that not only are no new files appearing on disk,
714 * but we're not even incorporating new records into the in-memory
715 * state.
716 */
717 if (pending_lsn > prior_pending_lsn)
718 {
719 prior_pending_lsn = pending_lsn;
720 deadcycles = 0;
721 }
722 else
723 ++deadcycles;
724
725 /*
726 * If we've managed to wait for an entire minute without the WAL
727 * summarizer absorbing a single WAL record, error out; probably
728 * something is wrong.
729 *
730 * We could consider also erroring out if the summarizer is taking
731 * too long to catch up, but it's not clear what rate of progress
732 * would be acceptable and what would be too slow. So instead, we
733 * just try to error out in the case where there's no progress at
734 * all. That seems likely to catch a reasonable number of the
735 * things that can go wrong in practice (e.g. the summarizer
736 * process is completely hung, say because somebody hooked up a
737 * debugger to it or something) without giving up too quickly when
738 * the system is just slow.
739 */
740 if (deadcycles >= 6)
742 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
743 errmsg("WAL summarization is not progressing"),
744 errdetail("Summarization is needed through %X/%X, but is stuck at %X/%X on disk and %X/%X in memory.",
745 LSN_FORMAT_ARGS(lsn),
746 LSN_FORMAT_ARGS(summarized_lsn),
747 LSN_FORMAT_ARGS(pending_lsn))));
748
749
750 /*
751 * Otherwise, just let the user know what's happening.
752 */
753 elapsed_seconds =
755 current_time) / 1000;
757 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
758 errmsg_plural("still waiting for WAL summarization through %X/%X after %ld second",
759 "still waiting for WAL summarization through %X/%X after %ld seconds",
760 elapsed_seconds,
761 LSN_FORMAT_ARGS(lsn),
762 elapsed_seconds),
763 errdetail("Summarization has reached %X/%X on disk and %X/%X in memory.",
764 LSN_FORMAT_ARGS(summarized_lsn),
765 LSN_FORMAT_ARGS(pending_lsn))));
766 }
767
768 /*
769 * Align the wait time to prevent drift. This doesn't really matter,
770 * but we'd like the warnings about how long we've been waiting to say
771 * 10 seconds, 20 seconds, 30 seconds, 40 seconds ... without ever
772 * drifting to something that is not a multiple of ten.
773 */
774 timeout_in_ms -=
775 TimestampDifferenceMilliseconds(cycle_time, current_time);
776
777 /* Wait and see. */
779 timeout_in_ms,
780 WAIT_EVENT_WAL_SUMMARY_READY);
781 }
782
784}
785
786/*
787 * On exit, update shared memory to make it clear that we're no longer
788 * running.
789 */
790static void
792{
793 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
795 LWLockRelease(WALSummarizerLock);
796}
797
798/*
799 * Get the latest LSN that is eligible to be summarized, and set *tli to the
800 * corresponding timeline.
801 */
802static XLogRecPtr
804{
805 if (!RecoveryInProgress())
806 {
807 /* Don't summarize WAL before it's flushed. */
808 return GetFlushRecPtr(tli);
809 }
810 else
811 {
812 XLogRecPtr flush_lsn;
813 TimeLineID flush_tli;
814 XLogRecPtr replay_lsn;
815 TimeLineID replay_tli;
816 TimeLineID insert_tli;
817
818 /*
819 * After the insert TLI has been set and before the control file has
820 * been updated to show the DB in production, RecoveryInProgress()
821 * will return true, because it's not yet safe for all backends to
822 * begin writing WAL. However, replay has already ceased, so from our
823 * point of view, recovery is already over. We should summarize up to
824 * where replay stopped and then prepare to resume at the start of the
825 * insert timeline.
826 */
827 if ((insert_tli = GetWALInsertionTimeLineIfSet()) != 0)
828 {
829 *tli = insert_tli;
830 return GetXLogReplayRecPtr(NULL);
831 }
832
833 /*
834 * What we really want to know is how much WAL has been flushed to
835 * disk, but the only flush position available is the one provided by
836 * the walreceiver, which may not be running, because this could be
837 * crash recovery or recovery via restore_command. So use either the
838 * WAL receiver's flush position or the replay position, whichever is
839 * further ahead, on the theory that if the WAL has been replayed then
840 * it must also have been flushed to disk.
841 */
842 flush_lsn = GetWalRcvFlushRecPtr(NULL, &flush_tli);
843 replay_lsn = GetXLogReplayRecPtr(&replay_tli);
844 if (flush_lsn > replay_lsn)
845 {
846 *tli = flush_tli;
847 return flush_lsn;
848 }
849 else
850 {
851 *tli = replay_tli;
852 return replay_lsn;
853 }
854 }
855}
856
857/*
858 * Interrupt handler for main loop of WAL summarizer process.
859 */
860static void
862{
865
867 {
868 ConfigReloadPending = false;
870 }
871
873 {
875 errmsg_internal("WAL summarizer shutting down"));
876 proc_exit(0);
877 }
878
879 /* Perform logging of memory contexts of this process */
882
883 /* Publish memory contexts of this process */
886}
887
888/*
889 * Summarize a range of WAL records on a single timeline.
890 *
891 * 'tli' is the timeline to be summarized.
892 *
893 * 'start_lsn' is the point at which we should start summarizing. If this
894 * value comes from the end LSN of the previous record as returned by the
895 * xlogreader machinery, 'exact' should be true; otherwise, 'exact' should
896 * be false, and this function will search forward for the start of a valid
897 * WAL record.
898 *
899 * 'switch_lsn' is the point at which we should switch to a later timeline,
900 * if we're summarizing a historic timeline.
901 *
902 * 'maximum_lsn' identifies the point beyond which we can't count on being
903 * able to read any more WAL. It should be the switch point when reading a
904 * historic timeline, or the most-recently-measured end of WAL when reading
905 * the current timeline.
906 *
907 * The return value is the LSN at which the WAL summary actually ends. Most
908 * often, a summary file ends because we notice that a checkpoint has
909 * occurred and reach the redo pointer of that checkpoint, but sometimes
910 * we stop for other reasons, such as a timeline switch.
911 */
912static XLogRecPtr
913SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact,
914 XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
915{
916 SummarizerReadLocalXLogPrivate *private_data;
918 XLogRecPtr summary_start_lsn;
919 XLogRecPtr summary_end_lsn = switch_lsn;
920 char temp_path[MAXPGPATH];
921 char final_path[MAXPGPATH];
922 WalSummaryIO io;
924 bool fast_forward = true;
925
926 /* Initialize private data for xlogreader. */
927 private_data = (SummarizerReadLocalXLogPrivate *)
929 private_data->tli = tli;
930 private_data->historic = !XLogRecPtrIsInvalid(switch_lsn);
931 private_data->read_upto = maximum_lsn;
932
933 /* Create xlogreader. */
936 .segment_open = &wal_segment_open,
937 .segment_close = &wal_segment_close),
938 private_data);
939 if (xlogreader == NULL)
941 (errcode(ERRCODE_OUT_OF_MEMORY),
942 errmsg("out of memory"),
943 errdetail("Failed while allocating a WAL reading processor.")));
944
945 /*
946 * When exact = false, we're starting from an arbitrary point in the WAL
947 * and must search forward for the start of the next record.
948 *
949 * When exact = true, start_lsn should be either the LSN where a record
950 * begins, or the LSN of a page where the page header is immediately
951 * followed by the start of a new record. XLogBeginRead should tolerate
952 * either case.
953 *
954 * We need to allow for both cases because the behavior of xlogreader
955 * varies. When a record spans two or more xlog pages, the ending LSN
956 * reported by xlogreader will be the starting LSN of the following
957 * record, but when an xlog page boundary falls between two records, the
958 * end LSN for the first will be reported as the first byte of the
959 * following page. We can't know until we read that page how large the
960 * header will be, but we'll have to skip over it to find the next record.
961 */
962 if (exact)
963 {
964 /*
965 * Even if start_lsn is the beginning of a page rather than the
966 * beginning of the first record on that page, we should still use it
967 * as the start LSN for the summary file. That's because we detect
968 * missing summary files by looking for cases where the end LSN of one
969 * file is less than the start LSN of the next file. When only a page
970 * header is skipped, nothing has been missed.
971 */
972 XLogBeginRead(xlogreader, start_lsn);
973 summary_start_lsn = start_lsn;
974 }
975 else
976 {
977 summary_start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
978 if (XLogRecPtrIsInvalid(summary_start_lsn))
979 {
980 /*
981 * If we hit end-of-WAL while trying to find the next valid
982 * record, we must be on a historic timeline that has no valid
983 * records that begin after start_lsn and before end of WAL.
984 */
985 if (private_data->end_of_wal)
986 {
988 errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
989 tli,
990 LSN_FORMAT_ARGS(start_lsn),
991 LSN_FORMAT_ARGS(private_data->read_upto)));
992
993 /*
994 * The timeline ends at or after start_lsn, without containing
995 * any records. Thus, we must make sure the main loop does not
996 * iterate. If start_lsn is the end of the timeline, then we
997 * won't actually emit an empty summary file, but otherwise,
998 * we must, to capture the fact that the LSN range in question
999 * contains no interesting WAL records.
1000 */
1001 summary_start_lsn = start_lsn;
1002 summary_end_lsn = private_data->read_upto;
1003 switch_lsn = xlogreader->EndRecPtr;
1004 }
1005 else
1006 ereport(ERROR,
1007 (errmsg("could not find a valid record after %X/%X",
1008 LSN_FORMAT_ARGS(start_lsn))));
1009 }
1010
1011 /* We shouldn't go backward. */
1012 Assert(summary_start_lsn >= start_lsn);
1013 }
1014
1015 /*
1016 * Main loop: read xlog records one by one.
1017 */
1018 while (1)
1019 {
1020 int block_id;
1021 char *errormsg;
1022 XLogRecord *record;
1023 uint8 rmid;
1024
1026
1027 /* We shouldn't go backward. */
1028 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1029
1030 /* Now read the next record. */
1031 record = XLogReadRecord(xlogreader, &errormsg);
1032 if (record == NULL)
1033 {
1034 if (private_data->end_of_wal)
1035 {
1036 /*
1037 * This timeline must be historic and must end before we were
1038 * able to read a complete record.
1039 */
1041 errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
1042 tli,
1044 LSN_FORMAT_ARGS(private_data->read_upto)));
1045 /* Summary ends at end of WAL. */
1046 summary_end_lsn = private_data->read_upto;
1047 break;
1048 }
1049 if (errormsg)
1050 ereport(ERROR,
1052 errmsg("could not read WAL from timeline %u at %X/%X: %s",
1054 errormsg)));
1055 else
1056 ereport(ERROR,
1058 errmsg("could not read WAL from timeline %u at %X/%X",
1060 }
1061
1062 /* We shouldn't go backward. */
1063 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1064
1065 if (!XLogRecPtrIsInvalid(switch_lsn) &&
1066 xlogreader->ReadRecPtr >= switch_lsn)
1067 {
1068 /*
1069 * Whoops! We've read a record that *starts* after the switch LSN,
1070 * contrary to our goal of reading only until we hit the first
1071 * record that ends at or after the switch LSN. Pretend we didn't
1072 * read it after all by bailing out of this loop right here,
1073 * before we do anything with this record.
1074 *
1075 * This can happen because the last record before the switch LSN
1076 * might be continued across multiple pages, and then we might
1077 * come to a page with XLP_FIRST_IS_OVERWRITE_CONTRECORD set. In
1078 * that case, the record that was continued across multiple pages
1079 * is incomplete and will be disregarded, and the read will
1080 * restart from the beginning of the page that is flagged
1081 * XLP_FIRST_IS_OVERWRITE_CONTRECORD.
1082 *
1083 * If this case occurs, we can fairly say that the current summary
1084 * file ends at the switch LSN exactly. The first record on the
1085 * page marked XLP_FIRST_IS_OVERWRITE_CONTRECORD will be
1086 * discovered when generating the next summary file.
1087 */
1088 summary_end_lsn = switch_lsn;
1089 break;
1090 }
1091
1092 /*
1093 * Certain types of records require special handling. Redo points and
1094 * shutdown checkpoints trigger creation of new summary files and can
1095 * also cause us to enter or exit "fast forward" mode. Other types of
1096 * records can require special updates to the block reference table.
1097 */
1098 rmid = XLogRecGetRmid(xlogreader);
1099 if (rmid == RM_XLOG_ID)
1100 {
1101 bool new_fast_forward;
1102
1103 /*
1104 * If we've already processed some WAL records when we hit a redo
1105 * point or shutdown checkpoint, then we stop summarization before
1106 * including this record in the current file, so that it will be
1107 * the first record in the next file.
1108 *
1109 * When we hit one of those record types as the first record in a
1110 * file, we adjust our notion of whether we're fast-forwarding.
1111 * Any WAL generated with wal_level=minimal must be skipped
1112 * without actually generating any summary file, because an
1113 * incremental backup that crosses such WAL would be unsafe.
1114 */
1115 if (SummarizeXlogRecord(xlogreader, &new_fast_forward))
1116 {
1117 if (xlogreader->ReadRecPtr > summary_start_lsn)
1118 {
1119 summary_end_lsn = xlogreader->ReadRecPtr;
1120 break;
1121 }
1122 else
1123 fast_forward = new_fast_forward;
1124 }
1125 }
1126 else if (!fast_forward)
1127 {
1128 /*
1129 * This switch handles record types that require extra updates to
1130 * the contents of the block reference table.
1131 */
1132 switch (rmid)
1133 {
1134 case RM_DBASE_ID:
1136 break;
1137 case RM_SMGR_ID:
1139 break;
1140 case RM_XACT_ID:
1142 break;
1143 }
1144 }
1145
1146 /*
1147 * If we're in fast-forward mode, we don't really need to do anything.
1148 * Otherwise, feed block references from xlog record to block
1149 * reference table.
1150 */
1151 if (!fast_forward)
1152 {
1153 for (block_id = 0; block_id <= XLogRecMaxBlockId(xlogreader);
1154 block_id++)
1155 {
1156 RelFileLocator rlocator;
1157 ForkNumber forknum;
1158 BlockNumber blocknum;
1159
1160 if (!XLogRecGetBlockTagExtended(xlogreader, block_id, &rlocator,
1161 &forknum, &blocknum, NULL))
1162 continue;
1163
1164 /*
1165 * As we do elsewhere, ignore the FSM fork, because it's not
1166 * fully WAL-logged.
1167 */
1168 if (forknum != FSM_FORKNUM)
1169 BlockRefTableMarkBlockModified(brtab, &rlocator, forknum,
1170 blocknum);
1171 }
1172 }
1173
1174 /* Update our notion of where this summary file ends. */
1175 summary_end_lsn = xlogreader->EndRecPtr;
1176
1177 /* Also update shared memory. */
1178 LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
1179 Assert(summary_end_lsn >= WalSummarizerCtl->summarized_lsn);
1180 WalSummarizerCtl->pending_lsn = summary_end_lsn;
1181 LWLockRelease(WALSummarizerLock);
1182
1183 /*
1184 * If we have a switch LSN and have reached it, stop before reading
1185 * the next record.
1186 */
1187 if (!XLogRecPtrIsInvalid(switch_lsn) &&
1188 xlogreader->EndRecPtr >= switch_lsn)
1189 break;
1190 }
1191
1192 /* Destroy xlogreader. */
1195
1196 /*
1197 * If a timeline switch occurs, we may fail to make any progress at all
1198 * before exiting the loop above. If that happens, we don't write a WAL
1199 * summary file at all. We can also skip writing a file if we're in
1200 * fast-forward mode.
1201 */
1202 if (summary_end_lsn > summary_start_lsn && !fast_forward)
1203 {
1204 /* Generate temporary and final path name. */
1205 snprintf(temp_path, MAXPGPATH,
1206 XLOGDIR "/summaries/temp.summary");
1207 snprintf(final_path, MAXPGPATH,
1208 XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
1209 tli,
1210 LSN_FORMAT_ARGS(summary_start_lsn),
1211 LSN_FORMAT_ARGS(summary_end_lsn));
1212
1213 /* Open the temporary file for writing. */
1214 io.filepos = 0;
1215 io.file = PathNameOpenFile(temp_path, O_WRONLY | O_CREAT | O_TRUNC);
1216 if (io.file < 0)
1217 ereport(ERROR,
1219 errmsg("could not create file \"%s\": %m", temp_path)));
1220
1221 /* Write the data. */
1223
1224 /* Close temporary file and shut down xlogreader. */
1225 FileClose(io.file);
1226
1227 /* Tell the user what we did. */
1229 errmsg_internal("summarized WAL on TLI %u from %X/%X to %X/%X",
1230 tli,
1231 LSN_FORMAT_ARGS(summary_start_lsn),
1232 LSN_FORMAT_ARGS(summary_end_lsn)));
1233
1234 /* Durably rename the new summary into place. */
1235 durable_rename(temp_path, final_path, ERROR);
1236 }
1237
1238 /* If we skipped a non-zero amount of WAL, log a debug message. */
1239 if (summary_end_lsn > summary_start_lsn && fast_forward)
1241 errmsg_internal("skipped summarizing WAL on TLI %u from %X/%X to %X/%X",
1242 tli,
1243 LSN_FORMAT_ARGS(summary_start_lsn),
1244 LSN_FORMAT_ARGS(summary_end_lsn)));
1245
1246 return summary_end_lsn;
1247}
1248
1249/*
1250 * Special handling for WAL records with RM_DBASE_ID.
1251 */
1252static void
1254{
1255 uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
1256
1257 /*
1258 * We use relfilenode zero for a given database OID and tablespace OID to
1259 * indicate that all relations with that pair of IDs have been recreated
1260 * if they exist at all. Effectively, we're setting a limit block of 0 for
1261 * all such relfilenodes.
1262 *
1263 * Technically, this special handling is only needed in the case of
1264 * XLOG_DBASE_CREATE_FILE_COPY, because that can create a whole bunch of
1265 * relation files in a directory without logging anything specific to each
1266 * one. If we didn't mark the whole DB OID/TS OID combination in some way,
1267 * then a tablespace that was dropped after the reference backup and
1268 * recreated using the FILE_COPY method prior to the incremental backup
1269 * would look just like one that was never touched at all, which would be
1270 * catastrophic.
1271 *
1272 * But it seems best to adopt this treatment for all records that drop or
1273 * create a DB OID/TS OID combination. That's similar to how we treat the
1274 * limit block for individual relations, and it's an extra layer of safety
1275 * here. We can never lose data by marking more stuff as needing to be
1276 * backed up in full.
1277 */
1278 if (info == XLOG_DBASE_CREATE_FILE_COPY)
1279 {
1281 RelFileLocator rlocator;
1282
1283 xlrec =
1285 rlocator.spcOid = xlrec->tablespace_id;
1286 rlocator.dbOid = xlrec->db_id;
1287 rlocator.relNumber = 0;
1288 BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1289 }
1290 else if (info == XLOG_DBASE_CREATE_WAL_LOG)
1291 {
1293 RelFileLocator rlocator;
1294
1296 rlocator.spcOid = xlrec->tablespace_id;
1297 rlocator.dbOid = xlrec->db_id;
1298 rlocator.relNumber = 0;
1299 BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1300 }
1301 else if (info == XLOG_DBASE_DROP)
1302 {
1303 xl_dbase_drop_rec *xlrec;
1304 RelFileLocator rlocator;
1305 int i;
1306
1308 rlocator.dbOid = xlrec->db_id;
1309 rlocator.relNumber = 0;
1310 for (i = 0; i < xlrec->ntablespaces; ++i)
1311 {
1312 rlocator.spcOid = xlrec->tablespace_ids[i];
1313 BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1314 }
1315 }
1316}
1317
1318/*
1319 * Special handling for WAL records with RM_SMGR_ID.
1320 */
1321static void
1323{
1324 uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
1325
1326 if (info == XLOG_SMGR_CREATE)
1327 {
1328 xl_smgr_create *xlrec;
1329
1330 /*
1331 * If a new relation fork is created on disk, there is no point
1332 * tracking anything about which blocks have been modified, because
1333 * the whole thing will be new. Hence, set the limit block for this
1334 * fork to 0.
1335 *
1336 * Ignore the FSM fork, which is not fully WAL-logged.
1337 */
1339
1340 if (xlrec->forkNum != FSM_FORKNUM)
1341 BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1342 xlrec->forkNum, 0);
1343 }
1344 else if (info == XLOG_SMGR_TRUNCATE)
1345 {
1346 xl_smgr_truncate *xlrec;
1347
1349
1350 /*
1351 * If a relation fork is truncated on disk, there is no point in
1352 * tracking anything about block modifications beyond the truncation
1353 * point.
1354 *
1355 * We ignore SMGR_TRUNCATE_FSM here because the FSM isn't fully
1356 * WAL-logged and thus we can't track modified blocks for it anyway.
1357 */
1358 if ((xlrec->flags & SMGR_TRUNCATE_HEAP) != 0)
1359 BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1360 MAIN_FORKNUM, xlrec->blkno);
1361 if ((xlrec->flags & SMGR_TRUNCATE_VM) != 0)
1362 BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1364 }
1365}
1366
1367/*
1368 * Special handling for WAL records with RM_XACT_ID.
1369 */
1370static void
1372{
1373 uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
1374 uint8 xact_info = info & XLOG_XACT_OPMASK;
1375
1376 if (xact_info == XLOG_XACT_COMMIT ||
1377 xact_info == XLOG_XACT_COMMIT_PREPARED)
1378 {
1380 xl_xact_parsed_commit parsed;
1381 int i;
1382
1383 /*
1384 * Don't track modified blocks for any relations that were removed on
1385 * commit.
1386 */
1388 for (i = 0; i < parsed.nrels; ++i)
1389 {
1390 ForkNumber forknum;
1391
1392 for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
1393 if (forknum != FSM_FORKNUM)
1394 BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
1395 forknum, 0);
1396 }
1397 }
1398 else if (xact_info == XLOG_XACT_ABORT ||
1399 xact_info == XLOG_XACT_ABORT_PREPARED)
1400 {
1402 xl_xact_parsed_abort parsed;
1403 int i;
1404
1405 /*
1406 * Don't track modified blocks for any relations that were removed on
1407 * abort.
1408 */
1409 ParseAbortRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed);
1410 for (i = 0; i < parsed.nrels; ++i)
1411 {
1412 ForkNumber forknum;
1413
1414 for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
1415 if (forknum != FSM_FORKNUM)
1416 BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
1417 forknum, 0);
1418 }
1419 }
1420}
1421
1422/*
1423 * Special handling for WAL records with RM_XLOG_ID.
1424 *
1425 * The return value is true if WAL summarization should stop before this
1426 * record and false otherwise. When the return value is true,
1427 * *new_fast_forward indicates whether future processing should be done
1428 * in fast forward mode (i.e. read WAL without emitting summaries) or not.
1429 */
1430static bool
1432{
1433 uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
1434 int record_wal_level;
1435
1436 if (info == XLOG_CHECKPOINT_REDO)
1437 {
1438 /* Payload is wal_level at the time record was written. */
1439 memcpy(&record_wal_level, XLogRecGetData(xlogreader), sizeof(int));
1440 }
1441 else if (info == XLOG_CHECKPOINT_SHUTDOWN)
1442 {
1443 CheckPoint rec_ckpt;
1444
1445 /* Extract wal_level at time record was written from payload. */
1446 memcpy(&rec_ckpt, XLogRecGetData(xlogreader), sizeof(CheckPoint));
1447 record_wal_level = rec_ckpt.wal_level;
1448 }
1449 else if (info == XLOG_PARAMETER_CHANGE)
1450 {
1451 xl_parameter_change xlrec;
1452
1453 /* Extract wal_level at time record was written from payload. */
1454 memcpy(&xlrec, XLogRecGetData(xlogreader),
1455 sizeof(xl_parameter_change));
1456 record_wal_level = xlrec.wal_level;
1457 }
1458 else if (info == XLOG_END_OF_RECOVERY)
1459 {
1460 xl_end_of_recovery xlrec;
1461
1462 /* Extract wal_level at time record was written from payload. */
1463 memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
1464 record_wal_level = xlrec.wal_level;
1465 }
1466 else
1467 {
1468 /* No special handling required. Return false. */
1469 return false;
1470 }
1471
1472 /*
1473 * Redo can only begin at an XLOG_CHECKPOINT_REDO or
1474 * XLOG_CHECKPOINT_SHUTDOWN record, so we want WAL summarization to begin
1475 * at those points. Hence, when those records are encountered, return
1476 * true, so that we stop just before summarizing either of those records.
1477 *
1478 * We also reach here if we just saw XLOG_END_OF_RECOVERY or
1479 * XLOG_PARAMETER_CHANGE. These are not places where recovery can start,
1480 * but they're still relevant here. A new timeline can begin with
1481 * XLOG_END_OF_RECOVERY, so we need to confirm the WAL level at that
1482 * point; and a restart can provoke XLOG_PARAMETER_CHANGE after an
1483 * intervening change to postgresql.conf, which might force us to stop
1484 * summarizing.
1485 */
1486 *new_fast_forward = (record_wal_level == WAL_LEVEL_MINIMAL);
1487 return true;
1488}
1489
1490/*
1491 * Similar to read_local_xlog_page, but limited to read from one particular
1492 * timeline. If the end of WAL is reached, it will wait for more if reading
1493 * from the current timeline, or give up if reading from a historic timeline.
1494 * In the latter case, it will also set private_data->end_of_wal = true.
1495 *
1496 * Caller must set private_data->tli to the TLI of interest,
1497 * private_data->read_upto to the lowest LSN that is not known to be safe
1498 * to read on that timeline, and private_data->historic to true if and only
1499 * if the timeline is not the current timeline. This function will update
1500 * private_data->read_upto and private_data->historic if more WAL appears
1501 * on the current timeline or if the current timeline becomes historic.
1502 */
1503static int
1505 XLogRecPtr targetPagePtr, int reqLen,
1506 XLogRecPtr targetRecPtr, char *cur_page)
1507{
1508 int count;
1509 WALReadError errinfo;
1510 SummarizerReadLocalXLogPrivate *private_data;
1511
1513
1514 private_data = (SummarizerReadLocalXLogPrivate *)
1515 state->private_data;
1516
1517 while (1)
1518 {
1519 if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto)
1520 {
1521 /*
1522 * more than one block available; read only that block, have
1523 * caller come back if they need more.
1524 */
1525 count = XLOG_BLCKSZ;
1526 break;
1527 }
1528 else if (targetPagePtr + reqLen > private_data->read_upto)
1529 {
1530 /* We don't seem to have enough data. */
1531 if (private_data->historic)
1532 {
1533 /*
1534 * This is a historic timeline, so there will never be any
1535 * more data than we have currently.
1536 */
1537 private_data->end_of_wal = true;
1538 return -1;
1539 }
1540 else
1541 {
1542 XLogRecPtr latest_lsn;
1543 TimeLineID latest_tli;
1544
1545 /*
1546 * This is - or at least was up until very recently - the
1547 * current timeline, so more data might show up. Delay here
1548 * so we don't tight-loop.
1549 */
1552
1553 /* Recheck end-of-WAL. */
1554 latest_lsn = GetLatestLSN(&latest_tli);
1555 if (private_data->tli == latest_tli)
1556 {
1557 /* Still the current timeline, update max LSN. */
1558 Assert(latest_lsn >= private_data->read_upto);
1559 private_data->read_upto = latest_lsn;
1560 }
1561 else
1562 {
1563 List *tles = readTimeLineHistory(latest_tli);
1564 XLogRecPtr switchpoint;
1565
1566 /*
1567 * The timeline we're scanning is no longer the latest
1568 * one. Figure out when it ended.
1569 */
1570 private_data->historic = true;
1571 switchpoint = tliSwitchPoint(private_data->tli, tles,
1572 NULL);
1573
1574 /*
1575 * Allow reads up to exactly the switch point.
1576 *
1577 * It's possible that this will cause read_upto to move
1578 * backwards, because we might have been promoted before
1579 * reaching the end of the previous timeline. In that
1580 * case, the next loop iteration will likely conclude that
1581 * we've reached end of WAL.
1582 */
1583 private_data->read_upto = switchpoint;
1584
1585 /* Debugging output. */
1587 errmsg_internal("timeline %u became historic, can read up to %X/%X",
1588 private_data->tli, LSN_FORMAT_ARGS(private_data->read_upto)));
1589 }
1590
1591 /* Go around and try again. */
1592 }
1593 }
1594 else
1595 {
1596 /* enough bytes available to satisfy the request */
1597 count = private_data->read_upto - targetPagePtr;
1598 break;
1599 }
1600 }
1601
1602 if (!WALRead(state, cur_page, targetPagePtr, count,
1603 private_data->tli, &errinfo))
1604 WALReadRaiseError(&errinfo);
1605
1606 /* Track that we read a page, for sleep time calculation. */
1608
1609 /* number of valid bytes in the buffer */
1610 return count;
1611}
1612
1613/*
1614 * Sleep for long enough that we believe it's likely that more WAL will
1615 * be available afterwards.
1616 */
1617static void
1619{
1621 {
1622 /*
1623 * No pages were read since the last sleep, so double the sleep time,
1624 * but not beyond the maximum allowable value.
1625 */
1627 }
1628 else if (pages_read_since_last_sleep > 1)
1629 {
1630 /*
1631 * Multiple pages were read since the last sleep, so reduce the sleep
1632 * time.
1633 *
1634 * A large burst of activity should be able to quickly reduce the
1635 * sleep time to the minimum, but we don't want a handful of extra WAL
1636 * records to provoke a strong reaction. We choose to reduce the sleep
1637 * time by 1 quantum for each page read beyond the first, which is a
1638 * fairly arbitrary way of trying to be reactive without overreacting.
1639 */
1641 sleep_quanta = 1;
1642 else
1644 }
1645
1646 /* Report pending statistics to the cumulative stats system. */
1647 pgstat_report_wal(false);
1648
1649 /* OK, now sleep. */
1650 (void) WaitLatch(MyLatch,
1653 WAIT_EVENT_WAL_SUMMARIZER_WAL);
1655
1656 /* Reset count of pages read. */
1658}
1659
1660/*
1661 * Remove WAL summaries whose mtimes are older than wal_summary_keep_time.
1662 */
1663static void
1665{
1666 XLogRecPtr redo_pointer = GetRedoRecPtr();
1667 List *wslist;
1668 time_t cutoff_time;
1669
1670 /* If WAL summary removal is disabled, don't do anything. */
1671 if (wal_summary_keep_time == 0)
1672 return;
1673
1674 /*
1675 * If the redo pointer has not advanced, don't do anything.
1676 *
1677 * This has the effect that we only try to remove old WAL summary files
1678 * once per checkpoint cycle.
1679 */
1680 if (redo_pointer == redo_pointer_at_last_summary_removal)
1681 return;
1683
1684 /*
1685 * Files should only be removed if the last modification time precedes the
1686 * cutoff time we compute here.
1687 */
1688 cutoff_time = time(NULL) - wal_summary_keep_time * SECS_PER_MINUTE;
1689
1690 /* Get all the summaries that currently exist. */
1692
1693 /* Loop until all summaries have been considered for removal. */
1694 while (wslist != NIL)
1695 {
1696 ListCell *lc;
1697 XLogSegNo oldest_segno;
1698 XLogRecPtr oldest_lsn = InvalidXLogRecPtr;
1699 TimeLineID selected_tli;
1700
1702
1703 /*
1704 * Pick a timeline for which some summary files still exist on disk,
1705 * and find the oldest LSN that still exists on disk for that
1706 * timeline.
1707 */
1708 selected_tli = ((WalSummaryFile *) linitial(wslist))->tli;
1709 oldest_segno = XLogGetOldestSegno(selected_tli);
1710 if (oldest_segno != 0)
1712 oldest_lsn);
1713
1714
1715 /* Consider each WAL file on the selected timeline in turn. */
1716 foreach(lc, wslist)
1717 {
1718 WalSummaryFile *ws = lfirst(lc);
1719
1721
1722 /* If it's not on this timeline, it's not time to consider it. */
1723 if (selected_tli != ws->tli)
1724 continue;
1725
1726 /*
1727 * If the WAL doesn't exist any more, we can remove it if the file
1728 * modification time is old enough.
1729 */
1730 if (XLogRecPtrIsInvalid(oldest_lsn) || ws->end_lsn <= oldest_lsn)
1731 RemoveWalSummaryIfOlderThan(ws, cutoff_time);
1732
1733 /*
1734 * Whether we removed the file or not, we need not consider it
1735 * again.
1736 */
1737 wslist = foreach_delete_current(wslist, lc);
1738 pfree(ws);
1739 }
1740 }
1741}
void pgaio_error_cleanup(void)
Definition: aio.c:1062
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
sigset_t UnBlockSig
Definition: pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
void BlockRefTableMarkBlockModified(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum)
Definition: blkreftable.c:297
void BlockRefTableSetLimitBlock(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber limit_block)
Definition: blkreftable.c:262
void WriteBlockRefTable(BlockRefTable *brtab, io_callback_fn write_callback, void *write_callback_arg)
Definition: blkreftable.c:474
void(*) BlockRefTable CreateEmptyBlockRefTable)(void)
uint32 BlockNumber
Definition: block.h:31
#define Min(x, y)
Definition: c.h:975
uint8_t uint8
Definition: c.h:500
size_t Size
Definition: c.h:576
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
int64 TimestampTz
Definition: timestamp.h:39
#define MINS_PER_HOUR
Definition: timestamp.h:129
#define SECS_PER_MINUTE
Definition: timestamp.h:128
#define HOURS_PER_DAY
Definition: timestamp.h:118
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_DROP
#define XLOG_DBASE_CREATE_FILE_COPY
void AtEOXact_HashTables(bool isCommit)
Definition: dynahash.c:1912
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1181
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
void EmitErrorReport(void)
Definition: elog.c:1692
int errcode_for_file_access(void)
Definition: elog.c:877
int errdetail(const char *fmt,...)
Definition: elog.c:1204
ErrorContextCallback * error_context_stack
Definition: elog.c:95
void FlushErrorState(void)
Definition: elog.c:1872
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
sigjmp_buf * PG_exception_stack
Definition: elog.c:97
#define WARNING
Definition: elog.h:36
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
void AtEOXact_Files(bool isCommit)
Definition: fd.c:3229
void FileClose(File file)
Definition: fd.c:1982
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1579
volatile sig_atomic_t LogMemoryContextPending
Definition: globals.c:41
volatile sig_atomic_t ProcSignalBarrierPending
Definition: globals.c:40
ProcNumber MyProcNumber
Definition: globals.c:91
volatile sig_atomic_t PublishMemoryContextPending
Definition: globals.c:42
struct Latch * MyLatch
Definition: globals.c:64
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
void LWLockReleaseAll(void)
Definition: lwlock.c:1953
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
void pfree(void *pointer)
Definition: mcxt.c:2150
void * palloc0(Size size)
Definition: mcxt.c:1973
void ProcessGetMemoryContextInterrupt(void)
Definition: mcxt.c:1436
MemoryContext TopMemoryContext
Definition: mcxt.c:165
void ProcessLogMemoryContextInterrupt(void)
Definition: mcxt.c:1384
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define AmWalSummarizerProcess()
Definition: miscadmin.h:392
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
@ B_WAL_SUMMARIZER
Definition: miscadmin.h:367
BackendType MyBackendType
Definition: miscinit.c:64
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void * arg
#define MAXPGPATH
#define XLOG_CHECKPOINT_REDO
Definition: pg_control.h:82
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:68
#define XLOG_PARAMETER_CHANGE
Definition: pg_control.h:74
#define XLOG_END_OF_RECOVERY
Definition: pg_control.h:77
while(p+4<=pend)
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define linitial(l)
Definition: pg_list.h:178
void pgstat_report_wal(bool force)
Definition: pgstat_wal.c:46
#define pqsignal
Definition: port.h:531
#define snprintf
Definition: port.h:239
uintptr_t Datum
Definition: postgres.h:69
#define GetPGProcByNumber(n)
Definition: proc.h:424
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int ProcNumber
Definition: procnumber.h:24
void ProcessProcSignalBarrier(void)
Definition: procsignal.c:498
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
ForkNumber
Definition: relpath.h:56
@ FSM_FORKNUM
Definition: relpath.h:59
@ VISIBILITYMAP_FORKNUM
Definition: relpath.h:60
@ MAIN_FORKNUM
Definition: relpath.h:58
#define MAX_FORKNUM
Definition: relpath.h:70
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1019
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
PROC_HDR * ProcGlobal
Definition: proc.c:79
#define SMGR_TRUNCATE_VM
Definition: storage_xlog.h:41
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
#define SMGR_TRUNCATE_HEAP
Definition: storage_xlog.h:40
int wal_level
Definition: pg_control.h:43
Definition: pg_list.h:54
Latch procLatch
Definition: proc.h:170
PGPROC * allProcs
Definition: proc.h:372
RelFileNumber relNumber
TimeLineID tli
Definition: timeline.h:27
XLogRecPtr summarized_lsn
Definition: walsummarizer.c:86
TimeLineID summarized_tli
Definition: walsummarizer.c:85
ConditionVariable summary_file_cv
Definition: walsummarizer.c:94
ProcNumber summarizer_pgprocno
Definition: walsummarizer.c:88
XLogRecPtr pending_lsn
Definition: walsummarizer.c:89
XLogRecPtr end_lsn
Definition: walsummary.h:30
TimeLineID tli
Definition: walsummary.h:31
off_t filepos
Definition: walsummary.h:24
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
void * private_data
Definition: xlogreader.h:196
Definition: regguts.h:323
Oid tablespace_ids[FLEXIBLE_ARRAY_MEMBER]
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48
RelFileLocator * xlocators
Definition: xact.h:422
RelFileLocator * xlocators
Definition: xact.h:389
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static XLogRecPtr redo_pointer_at_last_summary_removal
#define MAX_SLEEP_QUANTA
static long pages_read_since_last_sleep
Size WalSummarizerShmemSize(void)
static XLogRecPtr GetLatestLSN(TimeLineID *tli)
void WalSummarizerMain(const void *startup_data, size_t startup_data_len)
static bool SummarizeXlogRecord(XLogReaderState *xlogreader, bool *new_fast_forward)
static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
static WalSummarizerData * WalSummarizerCtl
static void ProcessWalSummarizerInterrupts(void)
static void SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
bool summarize_wal
void WaitForWalSummarization(XLogRecPtr lsn)
static void SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
#define MS_PER_SLEEP_QUANTUM
void GetWalSummarizerState(TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn, XLogRecPtr *pending_lsn, int *summarizer_pid)
static long sleep_quanta
int wal_summary_keep_time
static int summarizer_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void WalSummarizerShutdown(int code, Datum arg)
static void SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void MaybeRemoveOldWalSummaries(void)
void WakeupWalSummarizer(void)
XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact)
void WalSummarizerShmemInit(void)
static void summarizer_wait_for_wal(void)
void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
Definition: walsummary.c:230
List * GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Definition: walsummary.c:43
int WriteWalSummary(void *wal_summary_io, void *data, int length)
Definition: walsummary.c:294
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171
#define XLOG_XACT_COMMIT_PREPARED
Definition: xact.h:172
#define XLOG_XACT_COMMIT
Definition: xact.h:169
#define XLOG_XACT_OPMASK
Definition: xact.h:179
#define XLOG_XACT_ABORT
Definition: xact.h:171
#define XLOG_XACT_ABORT_PREPARED
Definition: xact.h:173
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
Definition: xactdesc.c:35
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
Definition: xactdesc.c:141
bool RecoveryInProgress(void)
Definition: xlog.c:6522
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6625
int wal_segment_size
Definition: xlog.c:143
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6687
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3913
TimeLineID GetWALInsertionTimeLineIfSet(void)
Definition: xlog.c:6724
@ WAL_LEVEL_MINIMAL
Definition: xlog.h:74
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLOGDIR
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
Definition: xlogreader.c:2007
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:107
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1504
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:390
void XLogReaderFree(XLogReaderState *state)
Definition: xlogreader.c:162
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:1384
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:232
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetRmid(decoder)
Definition: xlogreader.h:411
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
#define XLogRecMaxBlockId(decoder)
Definition: xlogreader.h:418
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static XLogReaderState * xlogreader
Definition: xlogrecovery.c:189
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: xlogutils.c:806
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1011