127#define MAX_SLEEP_QUANTA 150
128#define MS_PER_SLEEP_QUANTUM 200
160 bool *new_fast_forward);
215 sigjmp_buf local_sigjmp_buf;
235 Assert(startup_data_len == 0);
278 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
324 WAIT_EVENT_WAL_SUMMARIZER_ERROR);
400 current_tli = switch_tli;
401 current_lsn = switch_lsn;
421 switch_lsn, latest_lsn);
423 Assert(end_of_summary_lsn >= current_lsn);
430 current_lsn = end_of_summary_lsn;
463 *summarizer_pid = -1;
478 *summarizer_pid = -1;
490 if (*summarizer_pid <= 0)
491 *summarizer_pid = -1;
515 bool should_make_exact =
false;
516 List *existing_summaries;
530 if (!am_wal_summarizer)
539 if (lsn_is_exact != NULL)
542 return unsummarized_lsn;
566 if (oldest_segno != 0)
572 unsummarized_tli = tle->
tli;
584 foreach(lc, existing_summaries)
588 if (ws->
end_lsn > unsummarized_lsn)
590 unsummarized_lsn = ws->
end_lsn;
591 should_make_exact =
true;
596 if (unsummarized_tli == 0)
598 errcode(ERRCODE_INTERNAL_ERROR),
624 if (lsn_is_exact != NULL)
628 return unsummarized_lsn;
675 long timeout_in_ms = 10000;
694 if (summarized_lsn >= lsn)
702 current_time) >= timeout_in_ms)
704 long elapsed_seconds;
717 if (pending_lsn > prior_pending_lsn)
719 prior_pending_lsn = pending_lsn;
742 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
743 errmsg(
"WAL summarization is not progressing"),
744 errdetail(
"Summarization is needed through %X/%X, but is stuck at %X/%X on disk and %X/%X in memory.",
755 current_time) / 1000;
757 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
758 errmsg_plural(
"still waiting for WAL summarization through %X/%X after %ld second",
759 "still waiting for WAL summarization through %X/%X after %ld seconds",
763 errdetail(
"Summarization has reached %X/%X on disk and %X/%X in memory.",
780 WAIT_EVENT_WAL_SUMMARY_READY);
844 if (flush_lsn > replay_lsn)
924 bool fast_forward =
true;
929 private_data->
tli = tli;
941 (
errcode(ERRCODE_OUT_OF_MEMORY),
943 errdetail(
"Failed while allocating a WAL reading processor.")));
973 summary_start_lsn = start_lsn;
988 errmsg_internal(
"could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
1001 summary_start_lsn = start_lsn;
1002 summary_end_lsn = private_data->
read_upto;
1007 (
errmsg(
"could not find a valid record after %X/%X",
1012 Assert(summary_start_lsn >= start_lsn);
1028 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1041 errmsg_internal(
"could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
1046 summary_end_lsn = private_data->
read_upto;
1052 errmsg(
"could not read WAL from timeline %u at %X/%X: %s",
1058 errmsg(
"could not read WAL from timeline %u at %X/%X",
1063 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1088 summary_end_lsn = switch_lsn;
1099 if (rmid == RM_XLOG_ID)
1101 bool new_fast_forward;
1123 fast_forward = new_fast_forward;
1126 else if (!fast_forward)
1161 &forknum, &blocknum, NULL))
1202 if (summary_end_lsn > summary_start_lsn && !fast_forward)
1206 XLOGDIR "/summaries/temp.summary");
1208 XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
1219 errmsg(
"could not create file \"%s\": %m", temp_path)));
1239 if (summary_end_lsn > summary_start_lsn && fast_forward)
1241 errmsg_internal(
"skipped summarizing WAL on TLI %u from %X/%X to %X/%X",
1246 return summary_end_lsn;
1392 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1414 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1434 int record_wal_level;
1515 state->private_data;
1519 if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto)
1525 count = XLOG_BLCKSZ;
1528 else if (targetPagePtr + reqLen > private_data->
read_upto)
1555 if (private_data->
tli == latest_tli)
1597 count = private_data->
read_upto - targetPagePtr;
1603 private_data->
tli, &errinfo))
1653 WAIT_EVENT_WAL_SUMMARIZER_WAL);
1694 while (wslist !=
NIL)
1710 if (oldest_segno != 0)
1723 if (selected_tli != ws->
tli)
void pgaio_error_cleanup(void)
void AuxiliaryProcessMainCommon(void)
List * readTimeLineHistory(TimeLineID targetTLI)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
void BlockRefTableMarkBlockModified(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum)
void BlockRefTableSetLimitBlock(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber limit_block)
void WriteBlockRefTable(BlockRefTable *brtab, io_callback_fn write_callback, void *write_callback_arg)
void(*) BlockRefTable CreateEmptyBlockRefTable)(void)
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_CREATE_FILE_COPY
void AtEOXact_HashTables(bool isCommit)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
void AtEOXact_Files(bool isCommit)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
volatile sig_atomic_t LogMemoryContextPending
volatile sig_atomic_t ProcSignalBarrierPending
volatile sig_atomic_t PublishMemoryContextPending
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
void * palloc0(Size size)
void ProcessGetMemoryContextInterrupt(void)
MemoryContext TopMemoryContext
void ProcessLogMemoryContextInterrupt(void)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define AmWalSummarizerProcess()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
BackendType MyBackendType
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define XLOG_CHECKPOINT_REDO
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_PARAMETER_CHANGE
#define XLOG_END_OF_RECOVERY
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
static void * list_nth(const List *list, int n)
void pgstat_report_wal(bool force)
#define GetPGProcByNumber(n)
#define INVALID_PROC_NUMBER
void ProcessProcSignalBarrier(void)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void ReleaseAuxProcessResources(bool isCommit)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define XLOG_SMGR_TRUNCATE
#define SMGR_TRUNCATE_HEAP
XLogRecPtr summarized_lsn
TimeLineID summarized_tli
ConditionVariable summary_file_cv
ProcNumber summarizer_pgprocno
Oid tablespace_ids[FLEXIBLE_ARRAY_MEMBER]
RelFileLocator * xlocators
RelFileLocator * xlocators
#define TimestampTzPlusMilliseconds(tz, ms)
static void pgstat_report_wait_end(void)
#define WL_EXIT_ON_PM_DEATH
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static XLogRecPtr redo_pointer_at_last_summary_removal
static long pages_read_since_last_sleep
Size WalSummarizerShmemSize(void)
static XLogRecPtr GetLatestLSN(TimeLineID *tli)
void WalSummarizerMain(const void *startup_data, size_t startup_data_len)
static bool SummarizeXlogRecord(XLogReaderState *xlogreader, bool *new_fast_forward)
static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
static WalSummarizerData * WalSummarizerCtl
static void ProcessWalSummarizerInterrupts(void)
static void SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
void WaitForWalSummarization(XLogRecPtr lsn)
static void SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
#define MS_PER_SLEEP_QUANTUM
void GetWalSummarizerState(TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn, XLogRecPtr *pending_lsn, int *summarizer_pid)
int wal_summary_keep_time
static int summarizer_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void WalSummarizerShutdown(int code, Datum arg)
static void SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void MaybeRemoveOldWalSummaries(void)
void WakeupWalSummarizer(void)
XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact)
void WalSummarizerShmemInit(void)
static void summarizer_wait_for_wal(void)
void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
List * GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
int WriteWalSummary(void *wal_summary_io, void *data, int length)
#define XLOG_XACT_COMMIT_PREPARED
#define XLOG_XACT_ABORT_PREPARED
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
bool RecoveryInProgress(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
TimeLineID GetWALInsertionTimeLineIfSet(void)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
#define XLogRecMaxBlockId(decoder)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
void WALReadRaiseError(WALReadError *errinfo)