125 #define MAX_SLEEP_QUANTA 150
126 #define MS_PER_SLEEP_QUANTUM 200
158 bool *new_fast_forward);
213 sigjmp_buf local_sigjmp_buf;
233 Assert(startup_data_len == 0);
276 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
321 WAIT_EVENT_WAL_SUMMARIZER_ERROR);
397 current_tli = switch_tli;
398 current_lsn = switch_lsn;
418 switch_lsn, latest_lsn);
420 Assert(end_of_summary_lsn >= current_lsn);
427 current_lsn = end_of_summary_lsn;
460 *summarizer_pid = -1;
475 *summarizer_pid = -1;
487 if (*summarizer_pid <= 0)
488 *summarizer_pid = -1;
512 bool should_make_exact =
false;
513 List *existing_summaries;
527 if (!am_wal_summarizer)
536 if (lsn_is_exact != NULL)
539 return unsummarized_lsn;
563 if (oldest_segno != 0)
569 unsummarized_tli = tle->
tli;
581 foreach(lc, existing_summaries)
585 if (ws->
end_lsn > unsummarized_lsn)
587 unsummarized_lsn = ws->
end_lsn;
588 should_make_exact =
true;
593 if (unsummarized_tli == 0)
595 errcode(ERRCODE_INTERNAL_ERROR),
621 if (lsn_is_exact != NULL)
625 return unsummarized_lsn;
672 long timeout_in_ms = 10000;
691 if (summarized_lsn >= lsn)
699 current_time) >= timeout_in_ms)
701 long elapsed_seconds;
714 if (pending_lsn > prior_pending_lsn)
716 prior_pending_lsn = pending_lsn;
739 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
740 errmsg(
"WAL summarization is not progressing"),
741 errdetail(
"Summarization is needed through %X/%X, but is stuck at %X/%X on disk and %X/%X in memory.",
752 current_time) / 1000;
754 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
755 errmsg_plural(
"still waiting for WAL summarization through %X/%X after %ld second",
756 "still waiting for WAL summarization through %X/%X after %ld seconds",
760 errdetail(
"Summarization has reached %X/%X on disk and %X/%X in memory.",
777 WAIT_EVENT_WAL_SUMMARY_READY);
841 if (flush_lsn > replay_lsn)
917 bool fast_forward =
true;
922 private_data->
tli = tli;
934 (
errcode(ERRCODE_OUT_OF_MEMORY),
936 errdetail(
"Failed while allocating a WAL reading processor.")));
966 summary_start_lsn = start_lsn;
981 errmsg_internal(
"could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
994 summary_start_lsn = start_lsn;
995 summary_end_lsn = private_data->
read_upto;
1000 (
errmsg(
"could not find a valid record after %X/%X",
1005 Assert(summary_start_lsn >= start_lsn);
1021 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1034 errmsg_internal(
"could not read WAL from timeline %u at %X/%X: end of WAL at %X/%X",
1039 summary_end_lsn = private_data->
read_upto;
1045 errmsg(
"could not read WAL from timeline %u at %X/%X: %s",
1051 errmsg(
"could not read WAL from timeline %u at %X/%X",
1056 Assert(summary_start_lsn <= xlogreader->EndRecPtr);
1081 summary_end_lsn = switch_lsn;
1092 if (rmid == RM_XLOG_ID)
1094 bool new_fast_forward;
1116 fast_forward = new_fast_forward;
1119 else if (!fast_forward)
1154 &forknum, &blocknum, NULL))
1195 if (summary_end_lsn > summary_start_lsn && !fast_forward)
1199 XLOGDIR "/summaries/temp.summary");
1201 XLOGDIR "/summaries/%08X%08X%08X%08X%08X.summary",
1212 errmsg(
"could not create file \"%s\": %m", temp_path)));
1232 if (summary_end_lsn > summary_start_lsn && fast_forward)
1234 errmsg_internal(
"skipped summarizing WAL on TLI %u from %X/%X to %X/%X",
1239 return summary_end_lsn;
1385 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1407 for (forknum = 0; forknum <=
MAX_FORKNUM; ++forknum)
1427 int record_wal_level;
1508 state->private_data;
1512 if (targetPagePtr + XLOG_BLCKSZ <= private_data->read_upto)
1518 count = XLOG_BLCKSZ;
1521 else if (targetPagePtr + reqLen > private_data->
read_upto)
1548 if (private_data->
tli == latest_tli)
1590 count = private_data->
read_upto - targetPagePtr;
1596 private_data->
tli, &errinfo))
1643 WAIT_EVENT_WAL_SUMMARIZER_WAL);
1684 while (wslist !=
NIL)
1700 if (oldest_segno != 0)
1713 if (selected_tli != ws->
tli)
void AuxiliaryProcessMainCommon(void)
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
List * readTimeLineHistory(TimeLineID targetTLI)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
void BlockRefTableMarkBlockModified(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum)
void BlockRefTableSetLimitBlock(BlockRefTable *brtab, const RelFileLocator *rlocator, ForkNumber forknum, BlockNumber limit_block)
void WriteBlockRefTable(BlockRefTable *brtab, io_callback_fn write_callback, void *write_callback_arg)
void(*) BlockRefTable CreateEmptyBlockRefTable)(void)
#define Assert(condition)
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_CREATE_FILE_COPY
void AtEOXact_HashTables(bool isCommit)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
void AtEOXact_Files(bool isCommit)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
volatile sig_atomic_t LogMemoryContextPending
volatile sig_atomic_t ProcSignalBarrierPending
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
void MemoryContextReset(MemoryContext context)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
void ProcessLogMemoryContextInterrupt(void)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define AmWalSummarizerProcess()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
BackendType MyBackendType
#define XLOG_CHECKPOINT_REDO
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_PARAMETER_CHANGE
#define XLOG_END_OF_RECOVERY
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
static void * list_nth(const List *list, int n)
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define GetPGProcByNumber(n)
#define INVALID_PROC_NUMBER
void ProcessProcSignalBarrier(void)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
MemoryContextSwitchTo(old_ctx)
void ReleaseAuxProcessResources(bool isCommit)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define XLOG_SMGR_TRUNCATE
#define SMGR_TRUNCATE_HEAP
XLogRecPtr summarized_lsn
TimeLineID summarized_tli
ConditionVariable summary_file_cv
ProcNumber summarizer_pgprocno
Oid tablespace_ids[FLEXIBLE_ARRAY_MEMBER]
RelFileLocator * xlocators
RelFileLocator * xlocators
#define TimestampTzPlusMilliseconds(tz, ms)
static void pgstat_report_wait_end(void)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
void SetWalSummarizerLatch(void)
static XLogRecPtr redo_pointer_at_last_summary_removal
static long pages_read_since_last_sleep
void WalSummarizerMain(char *startup_data, size_t startup_data_len)
Size WalSummarizerShmemSize(void)
static XLogRecPtr GetLatestLSN(TimeLineID *tli)
static bool SummarizeXlogRecord(XLogReaderState *xlogreader, bool *new_fast_forward)
static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact, XLogRecPtr switch_lsn, XLogRecPtr maximum_lsn)
static WalSummarizerData * WalSummarizerCtl
static void SummarizeXactRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
void WaitForWalSummarization(XLogRecPtr lsn)
static void SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void HandleWalSummarizerInterrupts(void)
#define MS_PER_SLEEP_QUANTUM
void GetWalSummarizerState(TimeLineID *summarized_tli, XLogRecPtr *summarized_lsn, XLogRecPtr *pending_lsn, int *summarizer_pid)
int wal_summary_keep_time
static int summarizer_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
static void WalSummarizerShutdown(int code, Datum arg)
static void SummarizeSmgrRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
static void MaybeRemoveOldWalSummaries(void)
XLogRecPtr GetOldestUnsummarizedLSN(TimeLineID *tli, bool *lsn_is_exact)
void WalSummarizerShmemInit(void)
static void summarizer_wait_for_wal(void)
void RemoveWalSummaryIfOlderThan(WalSummaryFile *ws, time_t cutoff_time)
List * GetWalSummaries(TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
int WriteWalSummary(void *wal_summary_io, void *data, int length)
#define XLOG_XACT_COMMIT_PREPARED
#define XLOG_XACT_ABORT_PREPARED
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
bool RecoveryInProgress(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
TimeLineID GetWALInsertionTimeLineIfSet(void)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
bool XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum, Buffer *prefetch_buffer)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReaderFree(XLogReaderState *state)
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
#define XLogRecMaxBlockId(decoder)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
static XLogReaderState * xlogreader
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
void WALReadRaiseError(WALReadError *errinfo)