127#define MAX_SLEEP_QUANTA 150
128#define MS_PER_SLEEP_QUANTUM 200
539 if (lsn_is_exact !=
NULL)
624 if (lsn_is_exact !=
NULL)
694 if (summarized_lsn >= lsn)
743 errmsg(
"WAL summarization is not progressing"),
744 errdetail(
"Summarization is needed through %X/%08X, but is stuck at %X/%08X on disk and %X/%08X in memory.",
758 errmsg_plural(
"still waiting for WAL summarization through %X/%08X after %ld second",
759 "still waiting for WAL summarization through %X/%08X after %ld seconds",
763 errdetail(
"Summarization has reached %X/%08X on disk and %X/%08X in memory.",
920 bool fast_forward =
true;
924 private_data->
tli = tli;
938 errdetail(
"Failed while allocating a WAL reading processor.")));
983 errmsg_internal(
"could not read WAL from timeline %u at %X/%08X: end of WAL at %X/%08X",
1002 errmsg(
"could not find a valid record after %X/%08X",
1036 errmsg_internal(
"could not read WAL from timeline %u at %X/%08X: end of WAL at %X/%08X",
1047 errmsg(
"could not read WAL from timeline %u at %X/%08X: %s",
1053 errmsg(
"could not read WAL from timeline %u at %X/%08X",
1121 else if (!fast_forward)
1156 &forknum, &blocknum,
NULL))
1201 XLOGDIR "/summaries/temp.summary");
1203 XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
1236 errmsg_internal(
"skipped summarizing WAL on TLI %u from %X/%08X to %X/%08X",
1305 for (
i = 0;
i <
xlrec->ntablespaces; ++
i)
1387 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1409 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1510 state->private_data;
1582 errmsg_internal(
"timeline %u became historic, can read up to %X/%08X",
void pgaio_error_cleanup(void)
void AuxiliaryProcessMainCommon(void)
List * readTimeLineHistory(TimeLineID targetTLI)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
void BlockRefTableMarkBlockModified(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum)
void BlockRefTableSetLimitBlock(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber limit_block)
void WriteBlockRefTable(BlockRefTable *brtab, io_callback_fn write_callback, void *write_callback_arg)
void(*) BlockRefTable CreateEmptyBlockRefTable)(void)
#define Assert(condition)
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_CREATE_FILE_COPY
void AtEOXact_HashTables(bool isCommit)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
void AtEOXact_Files(bool isCommit)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
#define palloc0_object(type)
volatile sig_atomic_t LogMemoryContextPending
volatile sig_atomic_t ProcSignalBarrierPending
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void ProcessLogMemoryContextInterrupt(void)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define AmWalSummarizerProcess()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
BackendType MyBackendType
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define XLOG_CHECKPOINT_REDO
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_PARAMETER_CHANGE
#define XLOG_END_OF_RECOVERY
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
static void * list_nth(const List *list, int n)
void pgstat_report_wal(bool force)
#define GetPGProcByNumber(n)
#define INVALID_PROC_NUMBER
void ProcessProcSignalBarrier(void)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void ReleaseAuxProcessResources(bool isCommit)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define XLOG_SMGR_TRUNCATE
#define SMGR_TRUNCATE_HEAP
XLogRecPtr summarized_lsn
TimeLineID summarized_tli
ConditionVariable summary_file_cv
ProcNumber summarizer_pgprocno
#define TimestampTzPlusMilliseconds(tz, ms)
static void pgstat_report_wait_end(void)
#define WL_EXIT_ON_PM_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static XLogRecPtr redo_pointer_at_last_summary_removal
static long pages_read_since_last_sleep
Size WalSummarizerShmemSize(void)
static XLogRecPtr GetLatestLSN(TimeLineID *tli)
void WalSummarizerMain(const void *startup_data, size_t startup_data_len)
static bool SummarizeXlogRecord(XLogReaderState *xlogreader, bool *new_fast_forward)
static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
static WalSummarizerData * WalSummarizerCtl
static void ProcessWalSummarizerInterrupts(void)
static void SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
void WaitForWalSummarization(XLogRecPtr lsn)
static void SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
#define MS_PER_SLEEP_QUANTUM
void GetWalSummarizerState(TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn, XLogRecPtr *pending_lsn, int *summarizer_pid)
int wal_summary_keep_time
static int summarizer_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void WalSummarizerShutdown(int code, Datum arg)
static void SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void MaybeRemoveOldWalSummaries(void)
void WakeupWalSummarizer(void)
XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact)
void WalSummarizerShmemInit(void)
static void summarizer_wait_for_wal(void)
void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
List * GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
int WriteWalSummary(void *wal_summary_io, void *data, int length)
#define XLOG_XACT_COMMIT_PREPARED
#define XLOG_XACT_ABORT_PREPARED
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
bool RecoveryInProgress(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
TimeLineID GetWALInsertionTimeLineIfSet(void)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
#define XLogRecMaxBlockId(decoder)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
void WALReadRaiseError(WALReadError *errinfo)