PostgreSQL Source Code  git master
walsummarizer.c File Reference
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/walsummary.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
#include "common/blkreftable.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "postmaster/walsummarizer.h"
#include "replication/walreceiver.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/wait_event.h"
Include dependency graph for walsummarizer.c:

Go to the source code of this file.

Data Structures

struct  WalSummarizerData
 
struct  SummarizerReadLocalXLogPrivate
 

Macros

#define MAX_SLEEP_QUANTA   150
 
#define MS_PER_SLEEP_QUANTUM   200
 

Functions

static void WalSummarizerShutdown (int code, Datum arg)
 
static XLogRecPtr GetLatestLSN (TimeLineID *tli)
 
static void HandleWalSummarizerInterrupts (void)
 
static XLogRecPtr SummarizeWAL (TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
 
static void SummarizeDbaseRecord (XLogReaderState *xlogreader, BlockRefTable *brtab)
 
static void SummarizeSmgrRecord (XLogReaderState *xlogreader, BlockRefTable *brtab)
 
static void SummarizeXactRecord (XLogReaderState *xlogreader, BlockRefTable *brtab)
 
static bool SummarizeXlogRecord (XLogReaderState *xlogreader)
 
static int summarizer_read_local_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void summarizer_wait_for_wal (void)
 
static void MaybeRemoveOldWalSummaries (void)
 
Size WalSummarizerShmemSize (void)
 
void WalSummarizerShmemInit (void)
 
void WalSummarizerMain (char *startup_data, size_t startup_data_len)
 
void GetWalSummarizerState (TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn, XLogRecPtr *pending_lsn, int *summarizer_pid)
 
XLogRecPtr GetOldestUnsummarizedLSN (TimeLineID *tli, bool *lsn_is_exact, bool reset_pending_lsn)
 
void SetWalSummarizerLatch (void)
 
XLogRecPtr WaitForWalSummarization (XLogRecPtr lsn, long timeout, XLogRecPtr *pending_lsn)
 

Variables

static WalSummarizerDataWalSummarizerCtl
 
static long sleep_quanta = 1
 
static long pages_read_since_last_sleep = 0
 
static XLogRecPtr redo_pointer_at_last_summary_removal = InvalidXLogRecPtr
 
bool summarize_wal = false
 
int wal_summary_keep_time = 10 * HOURS_PER_DAY * MINS_PER_HOUR
 

Macro Definition Documentation

◆ MAX_SLEEP_QUANTA

#define MAX_SLEEP_QUANTA   150

Definition at line 125 of file walsummarizer.c.

◆ MS_PER_SLEEP_QUANTUM

#define MS_PER_SLEEP_QUANTUM   200

Definition at line 126 of file walsummarizer.c.

Function Documentation

◆ GetLatestLSN()

static XLogRecPtr GetLatestLSN ( TimeLineID tli)
static

Definition at line 702 of file walsummarizer.c.

703 {
704  if (!RecoveryInProgress())
705  {
706  /* Don't summarize WAL before it's flushed. */
707  return GetFlushRecPtr(tli);
708  }
709  else
710  {
711  XLogRecPtr flush_lsn;
712  TimeLineID flush_tli;
713  XLogRecPtr replay_lsn;
714  TimeLineID replay_tli;
715 
716  /*
717  * What we really want to know is how much WAL has been flushed to
718  * disk, but the only flush position available is the one provided by
719  * the walreceiver, which may not be running, because this could be
720  * crash recovery or recovery via restore_command. So use either the
721  * WAL receiver's flush position or the replay position, whichever is
722  * further ahead, on the theory that if the WAL has been replayed then
723  * it must also have been flushed to disk.
724  */
725  flush_lsn = GetWalRcvFlushRecPtr(NULL, &flush_tli);
726  replay_lsn = GetXLogReplayRecPtr(&replay_tli);
727  if (flush_lsn > replay_lsn)
728  {
729  *tli = flush_tli;
730  return flush_lsn;
731  }
732  else
733  {
734  *tli = replay_tli;
735  return replay_lsn;
736  }
737  }
738 }
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool RecoveryInProgress(void)
Definition: xlog.c:6290
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6455
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References GetFlushRecPtr(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), and RecoveryInProgress().

Referenced by GetOldestUnsummarizedLSN(), summarizer_read_local_xlog_page(), and WalSummarizerMain().

◆ GetOldestUnsummarizedLSN()

XLogRecPtr GetOldestUnsummarizedLSN ( TimeLineID tli,
bool lsn_is_exact,
bool  reset_pending_lsn 
)

Definition at line 494 of file walsummarizer.c.

496 {
497  TimeLineID latest_tli;
498  LWLockMode mode = reset_pending_lsn ? LW_EXCLUSIVE : LW_SHARED;
499  int n;
500  List *tles;
501  XLogRecPtr unsummarized_lsn = InvalidXLogRecPtr;
502  TimeLineID unsummarized_tli = 0;
503  bool should_make_exact = false;
504  List *existing_summaries;
505  ListCell *lc;
506 
507  /* If not summarizing WAL, do nothing. */
508  if (!summarize_wal)
509  return InvalidXLogRecPtr;
510 
511  /*
512  * Unless we need to reset the pending_lsn, we initially acquire the lock
513  * in shared mode and try to fetch the required information. If we acquire
514  * in shared mode and find that the data structure hasn't been
515  * initialized, we reacquire the lock in exclusive mode so that we can
516  * initialize it. However, if someone else does that first before we get
517  * the lock, then we can just return the requested information after all.
518  */
519  while (1)
520  {
521  LWLockAcquire(WALSummarizerLock, mode);
522 
524  {
525  unsummarized_lsn = WalSummarizerCtl->summarized_lsn;
526  if (tli != NULL)
528  if (lsn_is_exact != NULL)
529  *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
530  if (reset_pending_lsn)
533  LWLockRelease(WALSummarizerLock);
534  return unsummarized_lsn;
535  }
536 
537  if (mode == LW_EXCLUSIVE)
538  break;
539 
540  LWLockRelease(WALSummarizerLock);
541  mode = LW_EXCLUSIVE;
542  }
543 
544  /*
545  * The data structure needs to be initialized, and we are the first to
546  * obtain the lock in exclusive mode, so it's our job to do that
547  * initialization.
548  *
549  * So, find the oldest timeline on which WAL still exists, and the
550  * earliest segment for which it exists.
551  */
552  (void) GetLatestLSN(&latest_tli);
553  tles = readTimeLineHistory(latest_tli);
554  for (n = list_length(tles) - 1; n >= 0; --n)
555  {
556  TimeLineHistoryEntry *tle = list_nth(tles, n);
557  XLogSegNo oldest_segno;
558 
559  oldest_segno = XLogGetOldestSegno(tle->tli);
560  if (oldest_segno != 0)
561  {
562  /* Compute oldest LSN that still exists on disk. */
563  XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size,
564  unsummarized_lsn);
565 
566  unsummarized_tli = tle->tli;
567  break;
568  }
569  }
570 
571  /* It really should not be possible for us to find no WAL. */
572  if (unsummarized_tli == 0)
573  ereport(ERROR,
574  errcode(ERRCODE_INTERNAL_ERROR),
575  errmsg_internal("no WAL found on timeline %u", latest_tli));
576 
577  /*
578  * Don't try to summarize anything older than the end LSN of the newest
579  * summary file that exists for this timeline.
580  */
581  existing_summaries =
582  GetWalSummaries(unsummarized_tli,
584  foreach(lc, existing_summaries)
585  {
586  WalSummaryFile *ws = lfirst(lc);
587 
588  if (ws->end_lsn > unsummarized_lsn)
589  {
590  unsummarized_lsn = ws->end_lsn;
591  should_make_exact = true;
592  }
593  }
594 
595  /* Update shared memory with the discovered values. */
597  WalSummarizerCtl->summarized_lsn = unsummarized_lsn;
598  WalSummarizerCtl->summarized_tli = unsummarized_tli;
599  WalSummarizerCtl->lsn_is_exact = should_make_exact;
600  WalSummarizerCtl->pending_lsn = unsummarized_lsn;
601 
602  /* Also return the to the caller as required. */
603  if (tli != NULL)
605  if (lsn_is_exact != NULL)
606  *lsn_is_exact = WalSummarizerCtl->lsn_is_exact;
607  LWLockRelease(WALSummarizerLock);
608 
609  return unsummarized_lsn;
610 }
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
int errcode(int sqlerrcode)
Definition: elog.c:859
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1170
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1783
LWLockMode
Definition: lwlock.h:113
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
Definition: pg_list.h:54
TimeLineID tli
Definition: timeline.h:27
XLogRecPtr summarized_lsn
Definition: walsummarizer.c:84
TimeLineID summarized_tli
Definition: walsummarizer.c:83
XLogRecPtr pending_lsn
Definition: walsummarizer.c:87
XLogRecPtr end_lsn
Definition: walsummary.h:30
static XLogRecPtr GetLatestLSN(TimeLineID *tli)
static WalSummarizerData * WalSummarizerCtl
bool summarize_wal
List * GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Definition: walsummary.c:43
int wal_segment_size
Definition: xlog.c:143
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
Definition: xlog.c:3763
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48

References WalSummaryFile::end_lsn, ereport, errcode(), errmsg_internal(), ERROR, GetLatestLSN(), GetWalSummaries(), WalSummarizerData::initialized, InvalidXLogRecPtr, lfirst, list_length(), list_nth(), WalSummarizerData::lsn_is_exact, LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), mode, WalSummarizerData::pending_lsn, readTimeLineHistory(), summarize_wal, WalSummarizerData::summarized_lsn, WalSummarizerData::summarized_tli, TimeLineHistoryEntry::tli, wal_segment_size, WalSummarizerCtl, XLogGetOldestSegno(), and XLogSegNoOffsetToRecPtr.

Referenced by KeepLogSeg(), and WalSummarizerMain().

◆ GetWalSummarizerState()

void GetWalSummarizerState ( TimeLineID summarized_tli,
XLogRecPtr summarized_lsn,
XLogRecPtr pending_lsn,
int *  summarizer_pid 
)

Definition at line 433 of file walsummarizer.c.

435 {
436  LWLockAcquire(WALSummarizerLock, LW_SHARED);
438  {
439  /*
440  * If initialized is false, the rest of the structure contents are
441  * undefined.
442  */
443  *summarized_tli = 0;
444  *summarized_lsn = InvalidXLogRecPtr;
445  *pending_lsn = InvalidXLogRecPtr;
446  *summarizer_pid = -1;
447  }
448  else
449  {
450  int summarizer_pgprocno = WalSummarizerCtl->summarizer_pgprocno;
451 
452  *summarized_tli = WalSummarizerCtl->summarized_tli;
453  *summarized_lsn = WalSummarizerCtl->summarized_lsn;
454  if (summarizer_pgprocno == INVALID_PROC_NUMBER)
455  {
456  /*
457  * If the summarizer has exited, the fact that it had processed
458  * beyond summarized_lsn is irrelevant now.
459  */
460  *pending_lsn = WalSummarizerCtl->summarized_lsn;
461  *summarizer_pid = -1;
462  }
463  else
464  {
465  *pending_lsn = WalSummarizerCtl->pending_lsn;
466 
467  /*
468  * We're not fussed about inexact answers here, since they could
469  * become stale instantly, so we don't bother taking the lock, but
470  * make sure that invalid PID values are normalized to -1.
471  */
472  *summarizer_pid = GetPGProcByNumber(summarizer_pgprocno)->pid;
473  if (*summarizer_pid <= 0)
474  *summarizer_pid = -1;
475  }
476  }
477  LWLockRelease(WALSummarizerLock);
478 }
#define GetPGProcByNumber(n)
Definition: proc.h:428
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
ProcNumber summarizer_pgprocno
Definition: walsummarizer.c:86

References GetPGProcByNumber, WalSummarizerData::initialized, INVALID_PROC_NUMBER, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), WalSummarizerData::pending_lsn, WalSummarizerData::summarized_lsn, WalSummarizerData::summarized_tli, WalSummarizerData::summarizer_pgprocno, and WalSummarizerCtl.

Referenced by pg_get_wal_summarizer_state().

◆ HandleWalSummarizerInterrupts()

static void HandleWalSummarizerInterrupts ( void  )
static

Definition at line 744 of file walsummarizer.c.

745 {
748 
750  {
751  ConfigReloadPending = false;
753  }
754 
756  {
757  ereport(DEBUG1,
758  errmsg_internal("WAL summarizer shutting down"));
759  proc_exit(0);
760  }
761 
762  /* Perform logging of memory contexts of this process */
765 }
#define DEBUG1
Definition: elog.h:30
volatile sig_atomic_t LogMemoryContextPending
Definition: globals.c:39
volatile sig_atomic_t ProcSignalBarrierPending
Definition: globals.c:38
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104
void ProcessLogMemoryContextInterrupt(void)
Definition: mcxt.c:1288
void ProcessProcSignalBarrier(void)
Definition: procsignal.c:464

References ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), LogMemoryContextPending, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), ProcessLogMemoryContextInterrupt(), ProcessProcSignalBarrier(), ProcSignalBarrierPending, ShutdownRequestPending, and summarize_wal.

Referenced by MaybeRemoveOldWalSummaries(), summarizer_read_local_xlog_page(), SummarizeWAL(), and WalSummarizerMain().

◆ MaybeRemoveOldWalSummaries()

static void MaybeRemoveOldWalSummaries ( void  )
static

Definition at line 1459 of file walsummarizer.c.

1460 {
1461  XLogRecPtr redo_pointer = GetRedoRecPtr();
1462  List *wslist;
1463  time_t cutoff_time;
1464 
1465  /* If WAL summary removal is disabled, don't do anything. */
1466  if (wal_summary_keep_time == 0)
1467  return;
1468 
1469  /*
1470  * If the redo pointer has not advanced, don't do anything.
1471  *
1472  * This has the effect that we only try to remove old WAL summary files
1473  * once per checkpoint cycle.
1474  */
1475  if (redo_pointer == redo_pointer_at_last_summary_removal)
1476  return;
1477  redo_pointer_at_last_summary_removal = redo_pointer;
1478 
1479  /*
1480  * Files should only be removed if the last modification time precedes the
1481  * cutoff time we compute here.
1482  */
1483  cutoff_time = time(NULL) - wal_summary_keep_time * SECS_PER_MINUTE;
1484 
1485  /* Get all the summaries that currently exist. */
1487 
1488  /* Loop until all summaries have been considered for removal. */
1489  while (wslist != NIL)
1490  {
1491  ListCell *lc;
1492  XLogSegNo oldest_segno;
1493  XLogRecPtr oldest_lsn = InvalidXLogRecPtr;
1494  TimeLineID selected_tli;
1495 
1497 
1498  /*
1499  * Pick a timeline for which some summary files still exist on disk,
1500  * and find the oldest LSN that still exists on disk for that
1501  * timeline.
1502  */
1503  selected_tli = ((WalSummaryFile *) linitial(wslist))->tli;
1504  oldest_segno = XLogGetOldestSegno(selected_tli);
1505  if (oldest_segno != 0)
1506  XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size,
1507  oldest_lsn);
1508 
1509 
1510  /* Consider each WAL file on the selected timeline in turn. */
1511  foreach(lc, wslist)
1512  {
1513  WalSummaryFile *ws = lfirst(lc);
1514 
1516 
1517  /* If it's not on this timeline, it's not time to consider it. */
1518  if (selected_tli != ws->tli)
1519  continue;
1520 
1521  /*
1522  * If the WAL doesn't exist any more, we can remove it if the file
1523  * modification time is old enough.
1524  */
1525  if (XLogRecPtrIsInvalid(oldest_lsn) || ws->end_lsn <= oldest_lsn)
1526  RemoveWalSummaryIfOlderThan(ws, cutoff_time);
1527 
1528  /*
1529  * Whether we removed the file or not, we need not consider it
1530  * again.
1531  */
1532  wslist = foreach_delete_current(wslist, lc);
1533  pfree(ws);
1534  }
1535  }
1536 }
#define SECS_PER_MINUTE
Definition: timestamp.h:128
void pfree(void *pointer)
Definition: mcxt.c:1520
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define linitial(l)
Definition: pg_list.h:178
TimeLineID tli
Definition: walsummary.h:31
static XLogRecPtr redo_pointer_at_last_summary_removal
static void HandleWalSummarizerInterrupts(void)
int wal_summary_keep_time
void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
Definition: walsummary.c:230
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6393
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSummaryFile::end_lsn, foreach_delete_current, GetRedoRecPtr(), GetWalSummaries(), HandleWalSummarizerInterrupts(), InvalidXLogRecPtr, lfirst, linitial, NIL, pfree(), redo_pointer_at_last_summary_removal, RemoveWalSummaryIfOlderThan(), SECS_PER_MINUTE, WalSummaryFile::tli, wal_segment_size, wal_summary_keep_time, XLogGetOldestSegno(), XLogRecPtrIsInvalid, and XLogSegNoOffsetToRecPtr.

Referenced by WalSummarizerMain().

◆ SetWalSummarizerLatch()

void SetWalSummarizerLatch ( void  )

Definition at line 621 of file walsummarizer.c.

622 {
623  ProcNumber pgprocno;
624 
625  if (WalSummarizerCtl == NULL)
626  return;
627 
628  LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
630  LWLockRelease(WALSummarizerLock);
631 
632  if (pgprocno != INVALID_PROC_NUMBER)
633  SetLatch(&ProcGlobal->allProcs[pgprocno].procLatch);
634 }
void SetLatch(Latch *latch)
Definition: latch.c:632
int ProcNumber
Definition: procnumber.h:24
PROC_HDR * ProcGlobal
Definition: proc.c:78
Latch procLatch
Definition: proc.h:165
PGPROC * allProcs
Definition: proc.h:380

References PROC_HDR::allProcs, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ProcGlobal, PGPROC::procLatch, SetLatch(), WalSummarizerData::summarizer_pgprocno, and WalSummarizerCtl.

Referenced by CreateCheckPoint().

◆ SummarizeDbaseRecord()

static void SummarizeDbaseRecord ( XLogReaderState xlogreader,
BlockRefTable brtab 
)
static

Definition at line 1093 of file walsummarizer.c.

1094 {
1096 
1097  /*
1098  * We use relfilenode zero for a given database OID and tablespace OID to
1099  * indicate that all relations with that pair of IDs have been recreated
1100  * if they exist at all. Effectively, we're setting a limit block of 0 for
1101  * all such relfilenodes.
1102  *
1103  * Technically, this special handling is only needed in the case of
1104  * XLOG_DBASE_CREATE_FILE_COPY, because that can create a whole bunch of
1105  * relation files in a directory without logging anything specific to each
1106  * one. If we didn't mark the whole DB OID/TS OID combination in some way,
1107  * then a tablespace that was dropped after the reference backup and
1108  * recreated using the FILE_COPY method prior to the incremental backup
1109  * would look just like one that was never touched at all, which would be
1110  * catastrophic.
1111  *
1112  * But it seems best to adopt this treatment for all records that drop or
1113  * create a DB OID/TS OID combination. That's similar to how we treat the
1114  * limit block for individual relations, and it's an extra layer of safety
1115  * here. We can never lose data by marking more stuff as needing to be
1116  * backed up in full.
1117  */
1118  if (info == XLOG_DBASE_CREATE_FILE_COPY)
1119  {
1121  RelFileLocator rlocator;
1122 
1123  xlrec =
1125  rlocator.spcOid = xlrec->tablespace_id;
1126  rlocator.dbOid = xlrec->db_id;
1127  rlocator.relNumber = 0;
1128  BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1129  }
1130  else if (info == XLOG_DBASE_CREATE_WAL_LOG)
1131  {
1133  RelFileLocator rlocator;
1134 
1136  rlocator.spcOid = xlrec->tablespace_id;
1137  rlocator.dbOid = xlrec->db_id;
1138  rlocator.relNumber = 0;
1139  BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1140  }
1141  else if (info == XLOG_DBASE_DROP)
1142  {
1143  xl_dbase_drop_rec *xlrec;
1144  RelFileLocator rlocator;
1145  int i;
1146 
1148  rlocator.dbOid = xlrec->db_id;
1149  rlocator.relNumber = 0;
1150  for (i = 0; i < xlrec->ntablespaces; ++i)
1151  {
1152  rlocator.spcOid = xlrec->tablespace_ids[i];
1153  BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
1154  }
1155  }
1156 }
void BlockRefTableSetLimitBlock(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber limit_block)
Definition: blkreftable.c:262
unsigned char uint8
Definition: c.h:504
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_DROP
#define XLOG_DBASE_CREATE_FILE_COPY
int i
Definition: isn.c:73
@ MAIN_FORKNUM
Definition: relpath.h:50
RelFileNumber relNumber
Oid tablespace_ids[FLEXIBLE_ARRAY_MEMBER]
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
static XLogReaderState * xlogreader
Definition: xlogrecovery.c:188

References BlockRefTableSetLimitBlock(), xl_dbase_create_file_copy_rec::db_id, xl_dbase_create_wal_log_rec::db_id, xl_dbase_drop_rec::db_id, RelFileLocator::dbOid, i, MAIN_FORKNUM, xl_dbase_drop_rec::ntablespaces, RelFileLocator::relNumber, RelFileLocator::spcOid, xl_dbase_create_file_copy_rec::tablespace_id, xl_dbase_create_wal_log_rec::tablespace_id, xl_dbase_drop_rec::tablespace_ids, XLOG_DBASE_CREATE_FILE_COPY, XLOG_DBASE_CREATE_WAL_LOG, XLOG_DBASE_DROP, xlogreader, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

Referenced by SummarizeWAL().

◆ summarizer_read_local_xlog_page()

static int summarizer_read_local_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1296 of file walsummarizer.c.

1299 {
1300  int count;
1301  WALReadError errinfo;
1302  SummarizerReadLocalXLogPrivate *private_data;
1303 
1305 
1306  private_data = (SummarizerReadLocalXLogPrivate *)
1307  state->private_data;
1308 
1309  while (1)
1310  {
1311  if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto)
1312  {
1313  /*
1314  * more than one block available; read only that block, have
1315  * caller come back if they need more.
1316  */
1317  count = XLOG_BLCKSZ;
1318  break;
1319  }
1320  else if (targetPagePtr + reqLen > private_data->read_upto)
1321  {
1322  /* We don't seem to have enough data. */
1323  if (private_data->historic)
1324  {
1325  /*
1326  * This is a historic timeline, so there will never be any
1327  * more data than we have currently.
1328  */
1329  private_data->end_of_wal = true;
1330  return -1;
1331  }
1332  else
1333  {
1334  XLogRecPtr latest_lsn;
1335  TimeLineID latest_tli;
1336 
1337  /*
1338  * This is - or at least was up until very recently - the
1339  * current timeline, so more data might show up. Delay here
1340  * so we don't tight-loop.
1341  */
1344 
1345  /* Recheck end-of-WAL. */
1346  latest_lsn = GetLatestLSN(&latest_tli);
1347  if (private_data->tli == latest_tli)
1348  {
1349  /* Still the current timeline, update max LSN. */
1350  Assert(latest_lsn >= private_data->read_upto);
1351  private_data->read_upto = latest_lsn;
1352  }
1353  else
1354  {
1355  List *tles = readTimeLineHistory(latest_tli);
1356  XLogRecPtr switchpoint;
1357 
1358  /*
1359  * The timeline we're scanning is no longer the latest
1360  * one. Figure out when it ended.
1361  */
1362  private_data->historic = true;
1363  switchpoint = tliSwitchPoint(private_data->tli, tles,
1364  NULL);
1365 
1366  /*
1367  * Allow reads up to exactly the switch point.
1368  *
1369  * It's possible that this will cause read_upto to move
1370  * backwards, because walreceiver might have read a
1371  * partial record and flushed it to disk, and we'd view
1372  * that data as safe to read. However, the
1373  * XLOG_END_OF_RECOVERY record will be written at the end
1374  * of the last complete WAL record, not at the end of the
1375  * WAL that we've flushed to disk.
1376  *
1377  * So switchpoint < private->read_upto is possible here,
1378  * but switchpoint < state->EndRecPtr should not be.
1379  */
1380  Assert(switchpoint >= state->EndRecPtr);
1381  private_data->read_upto = switchpoint;
1382 
1383  /* Debugging output. */
1384  ereport(DEBUG1,
1385  errmsg("timeline %u became historic, can read up to %X/%X",
1386  private_data->tli, LSN_FORMAT_ARGS(private_data->read_upto)));
1387  }
1388 
1389  /* Go around and try again. */
1390  }
1391  }
1392  else
1393  {
1394  /* enough bytes available to satisfy the request */
1395  count = private_data->read_upto - targetPagePtr;
1396  break;
1397  }
1398  }
1399 
1400  if (!WALRead(state, cur_page, targetPagePtr, count,
1401  private_data->tli, &errinfo))
1402  WALReadRaiseError(&errinfo);
1403 
1404  /* Track that we read a page, for sleep time calculation. */
1406 
1407  /* number of valid bytes in the buffer */
1408  return count;
1409 }
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
#define Assert(condition)
Definition: c.h:858
int errmsg(const char *fmt,...)
Definition: elog.c:1072
while(p+4<=pend)
Definition: regguts.h:323
static long pages_read_since_last_sleep
static void summarizer_wait_for_wal(void)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1503
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1020

References Assert, DEBUG1, SummarizerReadLocalXLogPrivate::end_of_wal, ereport, errmsg(), GetLatestLSN(), HandleWalSummarizerInterrupts(), SummarizerReadLocalXLogPrivate::historic, LSN_FORMAT_ARGS, pages_read_since_last_sleep, SummarizerReadLocalXLogPrivate::read_upto, readTimeLineHistory(), summarizer_wait_for_wal(), SummarizerReadLocalXLogPrivate::tli, tliSwitchPoint(), WALRead(), WALReadRaiseError(), and while().

Referenced by SummarizeWAL().

◆ summarizer_wait_for_wal()

static void summarizer_wait_for_wal ( void  )
static

Definition at line 1416 of file walsummarizer.c.

1417 {
1418  if (pages_read_since_last_sleep == 0)
1419  {
1420  /*
1421  * No pages were read since the last sleep, so double the sleep time,
1422  * but not beyond the maximum allowable value.
1423  */
1425  }
1426  else if (pages_read_since_last_sleep > 1)
1427  {
1428  /*
1429  * Multiple pages were read since the last sleep, so reduce the sleep
1430  * time.
1431  *
1432  * A large burst of activity should be able to quickly reduce the
1433  * sleep time to the minimum, but we don't want a handful of extra WAL
1434  * records to provoke a strong reaction. We choose to reduce the sleep
1435  * time by 1 quantum for each page read beyond the first, which is a
1436  * fairly arbitrary way of trying to be reactive without overreacting.
1437  */
1439  sleep_quanta = 1;
1440  else
1442  }
1443 
1444  /* OK, now sleep. */
1445  (void) WaitLatch(MyLatch,
1448  WAIT_EVENT_WAL_SUMMARIZER_WAL);
1450 
1451  /* Reset count of pages read. */
1453 }
#define Min(x, y)
Definition: c.h:1004
struct Latch * MyLatch
Definition: globals.c:60
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define MAX_SLEEP_QUANTA
#define MS_PER_SLEEP_QUANTUM
static long sleep_quanta

References MAX_SLEEP_QUANTA, Min, MS_PER_SLEEP_QUANTUM, MyLatch, pages_read_since_last_sleep, ResetLatch(), sleep_quanta, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by summarizer_read_local_xlog_page().

◆ SummarizeSmgrRecord()

static void SummarizeSmgrRecord ( XLogReaderState xlogreader,
BlockRefTable brtab 
)
static

Definition at line 1162 of file walsummarizer.c.

1163 {
1165 
1166  if (info == XLOG_SMGR_CREATE)
1167  {
1168  xl_smgr_create *xlrec;
1169 
1170  /*
1171  * If a new relation fork is created on disk, there is no point
1172  * tracking anything about which blocks have been modified, because
1173  * the whole thing will be new. Hence, set the limit block for this
1174  * fork to 0.
1175  *
1176  * Ignore the FSM fork, which is not fully WAL-logged.
1177  */
1179 
1180  if (xlrec->forkNum != FSM_FORKNUM)
1181  BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1182  xlrec->forkNum, 0);
1183  }
1184  else if (info == XLOG_SMGR_TRUNCATE)
1185  {
1186  xl_smgr_truncate *xlrec;
1187 
1189 
1190  /*
1191  * If a relation fork is truncated on disk, there is no point in
1192  * tracking anything about block modifications beyond the truncation
1193  * point.
1194  *
1195  * We ignore SMGR_TRUNCATE_FSM here because the FSM isn't fully
1196  * WAL-logged and thus we can't track modified blocks for it anyway.
1197  */
1198  if ((xlrec->flags & SMGR_TRUNCATE_HEAP) != 0)
1199  BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1200  MAIN_FORKNUM, xlrec->blkno);
1201  if ((xlrec->flags & SMGR_TRUNCATE_VM) != 0)
1202  BlockRefTableSetLimitBlock(brtab, &xlrec->rlocator,
1203  VISIBILITYMAP_FORKNUM, xlrec->blkno);
1204  }
1205 }
@ FSM_FORKNUM
Definition: relpath.h:51
@ VISIBILITYMAP_FORKNUM
Definition: relpath.h:52
#define SMGR_TRUNCATE_VM
Definition: storage_xlog.h:41
#define XLOG_SMGR_CREATE
Definition: storage_xlog.h:30
#define XLOG_SMGR_TRUNCATE
Definition: storage_xlog.h:31
#define SMGR_TRUNCATE_HEAP
Definition: storage_xlog.h:40
ForkNumber forkNum
Definition: storage_xlog.h:36
RelFileLocator rlocator
Definition: storage_xlog.h:35
RelFileLocator rlocator
Definition: storage_xlog.h:49
BlockNumber blkno
Definition: storage_xlog.h:48

References xl_smgr_truncate::blkno, BlockRefTableSetLimitBlock(), xl_smgr_truncate::flags, xl_smgr_create::forkNum, FSM_FORKNUM, MAIN_FORKNUM, xl_smgr_create::rlocator, xl_smgr_truncate::rlocator, SMGR_TRUNCATE_HEAP, SMGR_TRUNCATE_VM, VISIBILITYMAP_FORKNUM, XLOG_SMGR_CREATE, XLOG_SMGR_TRUNCATE, xlogreader, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

Referenced by SummarizeWAL().

◆ SummarizeWAL()

static XLogRecPtr SummarizeWAL ( TimeLineID  tli,
XLogRecPtr  start_lsn,
bool  exact,
XLogRecPtr  switch_lsn,
XLogRecPtr  maximum_lsn 
)
static

Definition at line 792 of file walsummarizer.c.

794 {
795  SummarizerReadLocalXLogPrivate *private_data;
797  XLogRecPtr summary_start_lsn;
798  XLogRecPtr summary_end_lsn = switch_lsn;
799  char temp_path[MAXPGPATH];
800  char final_path[MAXPGPATH];
801  WalSummaryIO io;
803 
804  /* Initialize private data for xlogreader. */
805  private_data = (SummarizerReadLocalXLogPrivate *)
807  private_data->tli = tli;
808  private_data->historic = !XLogRecPtrIsInvalid(switch_lsn);
809  private_data->read_upto = maximum_lsn;
810 
811  /* Create xlogreader. */
814  .segment_open = &wal_segment_open,
815  .segment_close = &wal_segment_close),
816  private_data);
817  if (xlogreader == NULL)
818  ereport(ERROR,
819  (errcode(ERRCODE_OUT_OF_MEMORY),
820  errmsg("out of memory"),
821  errdetail("Failed while allocating a WAL reading processor.")));
822 
823  /*
824  * When exact = false, we're starting from an arbitrary point in the WAL
825  * and must search forward for the start of the next record.
826  *
827  * When exact = true, start_lsn should be either the LSN where a record
828  * begins, or the LSN of a page where the page header is immediately
829  * followed by the start of a new record. XLogBeginRead should tolerate
830  * either case.
831  *
832  * We need to allow for both cases because the behavior of xlogreader
833  * varies. When a record spans two or more xlog pages, the ending LSN
834  * reported by xlogreader will be the starting LSN of the following
835  * record, but when an xlog page boundary falls between two records, the
836  * end LSN for the first will be reported as the first byte of the
837  * following page. We can't know until we read that page how large the
838  * header will be, but we'll have to skip over it to find the next record.
839  */
840  if (exact)
841  {
842  /*
843  * Even if start_lsn is the beginning of a page rather than the
844  * beginning of the first record on that page, we should still use it
845  * as the start LSN for the summary file. That's because we detect
846  * missing summary files by looking for cases where the end LSN of one
847  * file is less than the start LSN of the next file. When only a page
848  * header is skipped, nothing has been missed.
849  */
850  XLogBeginRead(xlogreader, start_lsn);
851  summary_start_lsn = start_lsn;
852  }
853  else
854  {
855  summary_start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
856  if (XLogRecPtrIsInvalid(summary_start_lsn))
857  {
858  /*
859  * If we hit end-of-WAL while trying to find the next valid
860  * record, we must be on a historic timeline that has no valid
861  * records that begin after start_lsn and before end of WAL.
862  */
863  if (private_data->end_of_wal)
864  {
865  ereport(DEBUG1,
866  errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
867  tli,
868  LSN_FORMAT_ARGS(start_lsn),
869  LSN_FORMAT_ARGS(private_data->read_upto)));
870 
871  /*
872  * The timeline ends at or after start_lsn, without containing
873  * any records. Thus, we must make sure the main loop does not
874  * iterate. If start_lsn is the end of the timeline, then we
875  * won't actually emit an empty summary file, but otherwise,
876  * we must, to capture the fact that the LSN range in question
877  * contains no interesting WAL records.
878  */
879  summary_start_lsn = start_lsn;
880  summary_end_lsn = private_data->read_upto;
881  switch_lsn = xlogreader->EndRecPtr;
882  }
883  else
884  ereport(ERROR,
885  (errmsg("could not find a valid record after %X/%X",
886  LSN_FORMAT_ARGS(start_lsn))));
887  }
888 
889  /* We shouldn't go backward. */
890  Assert(summary_start_lsn >= start_lsn);
891  }
892 
893  /*
894  * Main loop: read xlog records one by one.
895  */
896  while (1)
897  {
898  int block_id;
899  char *errormsg;
900  XLogRecord *record;
901  bool stop_requested = false;
902 
904 
905  /* We shouldn't go backward. */
906  Assert(summary_start_lsn <= xlogreader->EndRecPtr);
907 
908  /* Now read the next record. */
909  record = XLogReadRecord(xlogreader, &errormsg);
910  if (record == NULL)
911  {
912  if (private_data->end_of_wal)
913  {
914  /*
915  * This timeline must be historic and must end before we were
916  * able to read a complete record.
917  */
918  ereport(DEBUG1,
919  errmsg_internal("could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
920  tli,
922  LSN_FORMAT_ARGS(private_data->read_upto)));
923  /* Summary ends at end of WAL. */
924  summary_end_lsn = private_data->read_upto;
925  break;
926  }
927  if (errormsg)
928  ereport(ERROR,
930  errmsg("could not read WAL from timeline %u at %X/%X: %s",
932  errormsg)));
933  else
934  ereport(ERROR,
936  errmsg("could not read WAL from timeline %u at %X/%X",
938  }
939 
940  /* We shouldn't go backward. */
941  Assert(summary_start_lsn <= xlogreader->EndRecPtr);
942 
943  if (!XLogRecPtrIsInvalid(switch_lsn) &&
944  xlogreader->ReadRecPtr >= switch_lsn)
945  {
946  /*
947  * Whoops! We've read a record that *starts* after the switch LSN,
948  * contrary to our goal of reading only until we hit the first
949  * record that ends at or after the switch LSN. Pretend we didn't
950  * read it after all by bailing out of this loop right here,
951  * before we do anything with this record.
952  *
953  * This can happen because the last record before the switch LSN
954  * might be continued across multiple pages, and then we might
955  * come to a page with XLP_FIRST_IS_OVERWRITE_CONTRECORD set. In
956  * that case, the record that was continued across multiple pages
957  * is incomplete and will be disregarded, and the read will
958  * restart from the beginning of the page that is flagged
959  * XLP_FIRST_IS_OVERWRITE_CONTRECORD.
960  *
961  * If this case occurs, we can fairly say that the current summary
962  * file ends at the switch LSN exactly. The first record on the
963  * page marked XLP_FIRST_IS_OVERWRITE_CONTRECORD will be
964  * discovered when generating the next summary file.
965  */
966  summary_end_lsn = switch_lsn;
967  break;
968  }
969 
970  /* Special handling for particular types of WAL records. */
971  switch (XLogRecGetRmid(xlogreader))
972  {
973  case RM_DBASE_ID:
975  break;
976  case RM_SMGR_ID:
978  break;
979  case RM_XACT_ID:
981  break;
982  case RM_XLOG_ID:
983  stop_requested = SummarizeXlogRecord(xlogreader);
984  break;
985  default:
986  break;
987  }
988 
989  /*
990  * If we've been told that it's time to end this WAL summary file, do
991  * so. As an exception, if there's nothing included in this WAL
992  * summary file yet, then stopping doesn't make any sense, and we
993  * should wait until the next stop point instead.
994  */
995  if (stop_requested && xlogreader->ReadRecPtr > summary_start_lsn)
996  {
997  summary_end_lsn = xlogreader->ReadRecPtr;
998  break;
999  }
1000 
1001  /* Feed block references from xlog record to block reference table. */
1002  for (block_id = 0; block_id <= XLogRecMaxBlockId(xlogreader);
1003  block_id++)
1004  {
1005  RelFileLocator rlocator;
1006  ForkNumber forknum;
1007  BlockNumber blocknum;
1008 
1009  if (!XLogRecGetBlockTagExtended(xlogreader, block_id, &rlocator,
1010  &forknum, &blocknum, NULL))
1011  continue;
1012 
1013  /*
1014  * As we do elsewhere, ignore the FSM fork, because it's not fully
1015  * WAL-logged.
1016  */
1017  if (forknum != FSM_FORKNUM)
1018  BlockRefTableMarkBlockModified(brtab, &rlocator, forknum,
1019  blocknum);
1020  }
1021 
1022  /* Update our notion of where this summary file ends. */
1023  summary_end_lsn = xlogreader->EndRecPtr;
1024 
1025  /* Also update shared memory. */
1026  LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
1027  Assert(summary_end_lsn >= WalSummarizerCtl->pending_lsn);
1028  Assert(summary_end_lsn >= WalSummarizerCtl->summarized_lsn);
1029  WalSummarizerCtl->pending_lsn = summary_end_lsn;
1030  LWLockRelease(WALSummarizerLock);
1031 
1032  /*
1033  * If we have a switch LSN and have reached it, stop before reading
1034  * the next record.
1035  */
1036  if (!XLogRecPtrIsInvalid(switch_lsn) &&
1037  xlogreader->EndRecPtr >= switch_lsn)
1038  break;
1039  }
1040 
1041  /* Destroy xlogreader. */
1044 
1045  /*
1046  * If a timeline switch occurs, we may fail to make any progress at all
1047  * before exiting the loop above. If that happens, we don't write a WAL
1048  * summary file at all.
1049  */
1050  if (summary_end_lsn > summary_start_lsn)
1051  {
1052  /* Generate temporary and final path name. */
1053  snprintf(temp_path, MAXPGPATH,
1054  XLOGDIR "/summaries/temp.summary");
1055  snprintf(final_path, MAXPGPATH,
1056  XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
1057  tli,
1058  LSN_FORMAT_ARGS(summary_start_lsn),
1059  LSN_FORMAT_ARGS(summary_end_lsn));
1060 
1061  /* Open the temporary file for writing. */
1062  io.filepos = 0;
1063  io.file = PathNameOpenFile(temp_path, O_WRONLY | O_CREAT | O_TRUNC);
1064  if (io.file < 0)
1065  ereport(ERROR,
1067  errmsg("could not create file \"%s\": %m", temp_path)));
1068 
1069  /* Write the data. */
1070  WriteBlockRefTable(brtab, WriteWalSummary, &io);
1071 
1072  /* Close temporary file and shut down xlogreader. */
1073  FileClose(io.file);
1074 
1075  /* Tell the user what we did. */
1076  ereport(DEBUG1,
1077  errmsg("summarized WAL on TLI %u from %X/%X to %X/%X",
1078  tli,
1079  LSN_FORMAT_ARGS(summary_start_lsn),
1080  LSN_FORMAT_ARGS(summary_end_lsn)));
1081 
1082  /* Durably rename the new summary into place. */
1083  durable_rename(temp_path, final_path, ERROR);
1084  }
1085 
1086  return summary_end_lsn;
1087 }
void BlockRefTableMarkBlockModified(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum)
Definition: blkreftable.c:297
void WriteBlockRefTable(BlockRefTable *brtab, io_callback_fn write_callback, void *write_callback_arg)
Definition: blkreftable.c:474
void(*) BlockRefTable CreateEmptyBlockRefTable)(void)
uint32 BlockNumber
Definition: block.h:31
int errcode_for_file_access(void)
Definition: elog.c:882
int errdetail(const char *fmt,...)
Definition: elog.c:1205
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
void FileClose(File file)
Definition: fd.c:1978
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
void * palloc0(Size size)
Definition: mcxt.c:1346
#define MAXPGPATH
#define snprintf
Definition: port.h:238
ForkNumber
Definition: relpath.h:48
off_t filepos
Definition: walsummary.h:24
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
void * private_data
Definition: xlogreader.h:196
static bool SummarizeXlogRecord(XLogReaderState *xlogreader)
static void SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static int summarizer_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
int WriteWalSummary(void *wal_summary_io, void *data, int length)
Definition: walsummary.c:294
#define XLOGDIR
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
Definition: xlogreader.c:1997
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:389
void XLogReaderFree(XLogReaderState *state)
Definition: xlogreader.c:161
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:106
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:1383
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231
#define XLogRecGetRmid(decoder)
Definition: xlogreader.h:411
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
#define XLogRecMaxBlockId(decoder)
Definition: xlogreader.h:418
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: xlogutils.c:817

References Assert, BlockRefTableMarkBlockModified(), DEBUG1, durable_rename(), SummarizerReadLocalXLogPrivate::end_of_wal, XLogReaderState::EndRecPtr, ereport, errcode(), errcode_for_file_access(), errdetail(), errmsg(), errmsg_internal(), ERROR, WalSummaryIO::file, FileClose(), WalSummaryIO::filepos, FSM_FORKNUM, HandleWalSummarizerInterrupts(), SummarizerReadLocalXLogPrivate::historic, LSN_FORMAT_ARGS, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAXPGPATH, palloc0(), PathNameOpenFile(), WalSummarizerData::pending_lsn, pfree(), XLogReaderState::private_data, SummarizerReadLocalXLogPrivate::read_upto, XLogReaderState::ReadRecPtr, snprintf, WalSummarizerData::summarized_lsn, SummarizeDbaseRecord(), summarizer_read_local_xlog_page(), SummarizeSmgrRecord(), SummarizeXactRecord(), SummarizeXlogRecord(), SummarizerReadLocalXLogPrivate::tli, wal_segment_close(), wal_segment_open(), wal_segment_size, WalSummarizerCtl, WriteBlockRefTable(), WriteWalSummary(), XL_ROUTINE, XLogBeginRead(), XLOGDIR, XLogFindNextRecord(), xlogreader, XLogReaderAllocate(), XLogReaderFree(), XLogReadRecord(), XLogRecGetBlockTagExtended(), XLogRecGetRmid, XLogRecMaxBlockId, and XLogRecPtrIsInvalid.

Referenced by WalSummarizerMain().

◆ SummarizeXactRecord()

static void SummarizeXactRecord ( XLogReaderState xlogreader,
BlockRefTable brtab 
)
static

Definition at line 1211 of file walsummarizer.c.

1212 {
1214  uint8 xact_info = info & XLOG_XACT_OPMASK;
1215 
1216  if (xact_info == XLOG_XACT_COMMIT ||
1217  xact_info == XLOG_XACT_COMMIT_PREPARED)
1218  {
1220  xl_xact_parsed_commit parsed;
1221  int i;
1222 
1223  /*
1224  * Don't track modified blocks for any relations that were removed on
1225  * commit.
1226  */
1227  ParseCommitRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed);
1228  for (i = 0; i < parsed.nrels; ++i)
1229  {
1230  ForkNumber forknum;
1231 
1232  for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
1233  if (forknum != FSM_FORKNUM)
1234  BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
1235  forknum, 0);
1236  }
1237  }
1238  else if (xact_info == XLOG_XACT_ABORT ||
1239  xact_info == XLOG_XACT_ABORT_PREPARED)
1240  {
1242  xl_xact_parsed_abort parsed;
1243  int i;
1244 
1245  /*
1246  * Don't track modified blocks for any relations that were removed on
1247  * abort.
1248  */
1249  ParseAbortRecord(XLogRecGetInfo(xlogreader), xlrec, &parsed);
1250  for (i = 0; i < parsed.nrels; ++i)
1251  {
1252  ForkNumber forknum;
1253 
1254  for (forknum = 0; forknum <= MAX_FORKNUM; ++forknum)
1255  if (forknum != FSM_FORKNUM)
1256  BlockRefTableSetLimitBlock(brtab, &parsed.xlocators[i],
1257  forknum, 0);
1258  }
1259  }
1260 }
#define MAX_FORKNUM
Definition: relpath.h:62
RelFileLocator * xlocators
Definition: xact.h:416
RelFileLocator * xlocators
Definition: xact.h:383
#define XLOG_XACT_COMMIT_PREPARED
Definition: xact.h:172
#define XLOG_XACT_COMMIT
Definition: xact.h:169
#define XLOG_XACT_OPMASK
Definition: xact.h:179
#define XLOG_XACT_ABORT
Definition: xact.h:171
#define XLOG_XACT_ABORT_PREPARED
Definition: xact.h:173
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
Definition: xactdesc.c:35
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
Definition: xactdesc.c:141

References BlockRefTableSetLimitBlock(), FSM_FORKNUM, i, MAX_FORKNUM, xl_xact_parsed_commit::nrels, xl_xact_parsed_abort::nrels, ParseAbortRecord(), ParseCommitRecord(), xl_xact_parsed_commit::xlocators, xl_xact_parsed_abort::xlocators, XLOG_XACT_ABORT, XLOG_XACT_ABORT_PREPARED, XLOG_XACT_COMMIT, XLOG_XACT_COMMIT_PREPARED, XLOG_XACT_OPMASK, xlogreader, XLogRecGetData, XLogRecGetInfo, and XLR_INFO_MASK.

Referenced by SummarizeWAL().

◆ SummarizeXlogRecord()

static bool SummarizeXlogRecord ( XLogReaderState xlogreader)
static

Definition at line 1266 of file walsummarizer.c.

1267 {
1269 
1270  if (info == XLOG_CHECKPOINT_REDO || info == XLOG_CHECKPOINT_SHUTDOWN)
1271  {
1272  /*
1273  * This is an LSN at which redo might begin, so we'd like
1274  * summarization to stop just before this WAL record.
1275  */
1276  return true;
1277  }
1278 
1279  return false;
1280 }
#define XLOG_CHECKPOINT_REDO
Definition: pg_control.h:81
#define XLOG_CHECKPOINT_SHUTDOWN
Definition: pg_control.h:67

References XLOG_CHECKPOINT_REDO, XLOG_CHECKPOINT_SHUTDOWN, xlogreader, XLogRecGetInfo, and XLR_INFO_MASK.

Referenced by SummarizeWAL().

◆ WaitForWalSummarization()

XLogRecPtr WaitForWalSummarization ( XLogRecPtr  lsn,
long  timeout,
XLogRecPtr pending_lsn 
)

Definition at line 646 of file walsummarizer.c.

647 {
650  XLogRecPtr summarized_lsn;
651 
653  Assert(timeout > 0);
654 
655  while (1)
656  {
658  long remaining_timeout;
659 
660  /*
661  * If the LSN summarized on disk has reached the target value, stop.
662  */
663  LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
664  summarized_lsn = WalSummarizerCtl->summarized_lsn;
665  *pending_lsn = WalSummarizerCtl->pending_lsn;
666  LWLockRelease(WALSummarizerLock);
667  if (summarized_lsn >= lsn)
668  break;
669 
670  /* Timeout reached? If yes, stop. */
672  remaining_timeout = TimestampDifferenceMilliseconds(now, deadline);
673  if (remaining_timeout <= 0)
674  break;
675 
676  /* Wait and see. */
678  remaining_timeout,
679  WAIT_EVENT_WAL_SUMMARY_READY);
680  }
681 
682  return summarized_lsn;
683 }
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1766
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1654
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1618
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
static time_t start_time
Definition: pg_ctl.c:94
ConditionVariable summary_file_cv
Definition: walsummarizer.c:92
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85

References Assert, ConditionVariableTimedSleep(), GetCurrentTimestamp(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), now(), WalSummarizerData::pending_lsn, start_time, WalSummarizerData::summarized_lsn, WalSummarizerData::summary_file_cv, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, WalSummarizerCtl, and XLogRecPtrIsInvalid.

Referenced by PrepareForIncrementalBackup().

◆ WalSummarizerMain()

void WalSummarizerMain ( char *  startup_data,
size_t  startup_data_len 
)

Definition at line 210 of file walsummarizer.c.

211 {
212  sigjmp_buf local_sigjmp_buf;
214 
215  /*
216  * Within this function, 'current_lsn' and 'current_tli' refer to the
217  * point from which the next WAL summary file should start. 'exact' is
218  * true if 'current_lsn' is known to be the start of a WAL record or WAL
219  * segment, and false if it might be in the middle of a record someplace.
220  *
221  * 'switch_lsn' and 'switch_tli', if set, are the LSN at which we need to
222  * switch to a new timeline and the timeline to which we need to switch.
223  * If not set, we either haven't figured out the answers yet or we're
224  * already on the latest timeline.
225  */
226  XLogRecPtr current_lsn;
227  TimeLineID current_tli;
228  bool exact;
229  XLogRecPtr switch_lsn = InvalidXLogRecPtr;
230  TimeLineID switch_tli = 0;
231 
232  Assert(startup_data_len == 0);
233 
236 
237  ereport(DEBUG1,
238  (errmsg_internal("WAL summarizer started")));
239 
240  /*
241  * Properly accept or ignore signals the postmaster might send us
242  *
243  * We have no particular use for SIGINT at the moment, but seems
244  * reasonable to treat like SIGTERM.
245  */
249  /* SIGQUIT handler was already set up by InitPostmasterChild */
253  pqsignal(SIGUSR2, SIG_IGN); /* not used */
254 
255  /* Advertise ourselves. */
257  LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
259  LWLockRelease(WALSummarizerLock);
260 
261  /* Create and switch to a memory context that we can reset on error. */
263  "Wal Summarizer",
266 
267  /*
268  * Reset some signals that are accepted by postmaster but not here
269  */
271 
272  /*
273  * If an exception is encountered, processing resumes here.
274  */
275  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
276  {
277  /* Since not using PG_TRY, must reset error stack by hand */
278  error_context_stack = NULL;
279 
280  /* Prevent interrupts while cleaning up */
281  HOLD_INTERRUPTS();
282 
283  /* Report the error to the server log */
284  EmitErrorReport();
285 
286  /* Release resources we might have acquired. */
291  AtEOXact_Files(false);
292  AtEOXact_HashTables(false);
293 
294  /*
295  * Now return to normal top-level context and clear ErrorContext for
296  * next time.
297  */
299  FlushErrorState();
300 
301  /* Flush any leaked data in the top-level context */
303 
304  /* Now we can allow interrupts again */
306 
307  /*
308  * Sleep for 10 seconds before attempting to resume operations in
309  * order to avoid excessive logging.
310  *
311  * Many of the likely error conditions are things that will repeat
312  * every time. For example, if the WAL can't be read or the summary
313  * can't be written, only administrator action will cure the problem.
314  * So a really fast retry time doesn't seem to be especially
315  * beneficial, and it will clutter the logs.
316  */
317  (void) WaitLatch(MyLatch,
319  10000,
320  WAIT_EVENT_WAL_SUMMARIZER_ERROR);
321  }
322 
323  /* We can now handle ereport(ERROR) */
324  PG_exception_stack = &local_sigjmp_buf;
325 
326  /*
327  * Unblock signals (they were blocked when the postmaster forked us)
328  */
329  sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
330 
331  /*
332  * Fetch information about previous progress from shared memory, and ask
333  * GetOldestUnsummarizedLSN to reset pending_lsn to summarized_lsn. We
334  * might be recovering from an error, and if so, pending_lsn might have
335  * advanced past summarized_lsn, but any WAL we read previously has been
336  * lost and will need to be reread.
337  *
338  * If we discover that WAL summarization is not enabled, just exit.
339  */
340  current_lsn = GetOldestUnsummarizedLSN(&current_tli, &exact, true);
341  if (XLogRecPtrIsInvalid(current_lsn))
342  proc_exit(0);
343 
344  /*
345  * Loop forever
346  */
347  for (;;)
348  {
349  XLogRecPtr latest_lsn;
350  TimeLineID latest_tli;
351  XLogRecPtr end_of_summary_lsn;
352 
353  /* Flush any leaked data in the top-level context */
355 
356  /* Process any signals received recently. */
358 
359  /* If it's time to remove any old WAL summaries, do that now. */
361 
362  /* Find the LSN and TLI up to which we can safely summarize. */
363  latest_lsn = GetLatestLSN(&latest_tli);
364 
365  /*
366  * If we're summarizing a historic timeline and we haven't yet
367  * computed the point at which to switch to the next timeline, do that
368  * now.
369  *
370  * Note that if this is a standby, what was previously the current
371  * timeline could become historic at any time.
372  *
373  * We could try to make this more efficient by caching the results of
374  * readTimeLineHistory when latest_tli has not changed, but since we
375  * only have to do this once per timeline switch, we probably wouldn't
376  * save any significant amount of work in practice.
377  */
378  if (current_tli != latest_tli && XLogRecPtrIsInvalid(switch_lsn))
379  {
380  List *tles = readTimeLineHistory(latest_tli);
381 
382  switch_lsn = tliSwitchPoint(current_tli, tles, &switch_tli);
383  ereport(DEBUG1,
384  errmsg("switch point from TLI %u to TLI %u is at %X/%X",
385  current_tli, switch_tli, LSN_FORMAT_ARGS(switch_lsn)));
386  }
387 
388  /*
389  * If we've reached the switch LSN, we can't summarize anything else
390  * on this timeline. Switch to the next timeline and go around again.
391  */
392  if (!XLogRecPtrIsInvalid(switch_lsn) && current_lsn >= switch_lsn)
393  {
394  current_tli = switch_tli;
395  switch_lsn = InvalidXLogRecPtr;
396  switch_tli = 0;
397  continue;
398  }
399 
400  /* Summarize WAL. */
401  end_of_summary_lsn = SummarizeWAL(current_tli,
402  current_lsn, exact,
403  switch_lsn, latest_lsn);
404  Assert(!XLogRecPtrIsInvalid(end_of_summary_lsn));
405  Assert(end_of_summary_lsn >= current_lsn);
406 
407  /*
408  * Update state for next loop iteration.
409  *
410  * Next summary file should start from exactly where this one ended.
411  */
412  current_lsn = end_of_summary_lsn;
413  exact = true;
414 
415  /* Update state in shared memory. */
416  LWLockAcquire(WALSummarizerLock, LW_EXCLUSIVE);
417  Assert(WalSummarizerCtl->pending_lsn <= end_of_summary_lsn);
418  WalSummarizerCtl->summarized_lsn = end_of_summary_lsn;
419  WalSummarizerCtl->summarized_tli = current_tli;
421  WalSummarizerCtl->pending_lsn = end_of_summary_lsn;
422  LWLockRelease(WALSummarizerLock);
423 
424  /* Wake up anyone waiting for more summary files to be written. */
426  }
427 }
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:44
sigset_t UnBlockSig
Definition: pqsignal.c:22
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void AtEOXact_HashTables(bool isCommit)
Definition: dynahash.c:1869
void EmitErrorReport(void)
Definition: elog.c:1672
ErrorContextCallback * error_context_stack
Definition: elog.c:94
void FlushErrorState(void)
Definition: elog.c:1836
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
void AtEOXact_Files(bool isCommit)
Definition: fd.c:3165
ProcNumber MyProcNumber
Definition: globals.c:87
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void LWLockReleaseAll(void)
Definition: lwlock.c:1878
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopMemoryContext
Definition: mcxt.c:149
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
@ B_WAL_SUMMARIZER
Definition: miscadmin.h:360
BackendType MyBackendType
Definition: miscinit.c:63
pqsigfunc pqsignal(int signo, pqsigfunc func)
uintptr_t Datum
Definition: postgres.h:64
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
tree context
Definition: radixtree.h:1833
MemoryContextSwitchTo(old_ctx)
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1002
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact, bool reset_pending_lsn)
static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
static void WalSummarizerShutdown(int code, Datum arg)
static void MaybeRemoveOldWalSummaries(void)
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGALRM
Definition: win32_port.h:174
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AtEOXact_Files(), AtEOXact_HashTables(), AuxiliaryProcessMainCommon(), B_WAL_SUMMARIZER, ConditionVariableBroadcast(), ConditionVariableCancelSleep(), context, DEBUG1, EmitErrorReport(), ereport, errmsg(), errmsg_internal(), error_context_stack, FlushErrorState(), GetLatestLSN(), GetOldestUnsummarizedLSN(), HandleWalSummarizerInterrupts(), HOLD_INTERRUPTS, InvalidXLogRecPtr, LSN_FORMAT_ARGS, WalSummarizerData::lsn_is_exact, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), MaybeRemoveOldWalSummaries(), MemoryContextReset(), MemoryContextSwitchTo(), MyBackendType, MyLatch, MyProcNumber, on_shmem_exit(), WalSummarizerData::pending_lsn, PG_exception_stack, pgstat_report_wait_end(), pqsignal(), proc_exit(), procsignal_sigusr1_handler(), readTimeLineHistory(), ReleaseAuxProcessResources(), RESUME_INTERRUPTS, SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, WalSummarizerData::summarized_lsn, WalSummarizerData::summarized_tli, WalSummarizerData::summarizer_pgprocno, SummarizeWAL(), WalSummarizerData::summary_file_cv, tliSwitchPoint(), TopMemoryContext, UnBlockSig, WaitLatch(), WalSummarizerCtl, WalSummarizerShutdown(), WL_EXIT_ON_PM_DEATH, WL_TIMEOUT, and XLogRecPtrIsInvalid.

◆ WalSummarizerShmemInit()

void WalSummarizerShmemInit ( void  )

Definition at line 179 of file walsummarizer.c.

180 {
181  bool found;
182 
184  ShmemInitStruct("Wal Summarizer Ctl", WalSummarizerShmemSize(),
185  &found);
186 
187  if (!found)
188  {
189  /*
190  * First time through, so initialize.
191  *
192  * We're just filling in dummy values here -- the real initialization
193  * will happen when GetOldestUnsummarizedLSN() is called for the first
194  * time.
195  */
196  WalSummarizerCtl->initialized = false;
203  }
204 }
void ConditionVariableInit(ConditionVariable *cv)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
Size WalSummarizerShmemSize(void)

References ConditionVariableInit(), WalSummarizerData::initialized, INVALID_PROC_NUMBER, InvalidXLogRecPtr, WalSummarizerData::lsn_is_exact, WalSummarizerData::pending_lsn, ShmemInitStruct(), WalSummarizerData::summarized_lsn, WalSummarizerData::summarized_tli, WalSummarizerData::summarizer_pgprocno, WalSummarizerData::summary_file_cv, WalSummarizerCtl, and WalSummarizerShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSummarizerShmemSize()

Size WalSummarizerShmemSize ( void  )

Definition at line 170 of file walsummarizer.c.

171 {
172  return sizeof(WalSummarizerData);
173 }

Referenced by CalculateShmemSize(), and WalSummarizerShmemInit().

◆ WalSummarizerShutdown()

static void WalSummarizerShutdown ( int  code,
Datum  arg 
)
static

Variable Documentation

◆ pages_read_since_last_sleep

long pages_read_since_last_sleep = 0
static

Definition at line 132 of file walsummarizer.c.

Referenced by summarizer_read_local_xlog_page(), and summarizer_wait_for_wal().

◆ redo_pointer_at_last_summary_removal

XLogRecPtr redo_pointer_at_last_summary_removal = InvalidXLogRecPtr
static

Definition at line 137 of file walsummarizer.c.

Referenced by MaybeRemoveOldWalSummaries().

◆ sleep_quanta

long sleep_quanta = 1
static

Definition at line 115 of file walsummarizer.c.

Referenced by summarizer_wait_for_wal().

◆ summarize_wal

◆ wal_summary_keep_time

int wal_summary_keep_time = 10 * HOURS_PER_DAY * MINS_PER_HOUR

Definition at line 143 of file walsummarizer.c.

Referenced by MaybeRemoveOldWalSummaries().

◆ WalSummarizerCtl