PostgreSQL Source Code  git master
xlog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  * PostgreSQL write-ahead log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/backend/access/transam/xlog.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <math.h>
19 #include <time.h>
20 #include <fcntl.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <unistd.h>
24 
25 #include "access/clog.h"
26 #include "access/commit_ts.h"
27 #include "access/heaptoast.h"
28 #include "access/multixact.h"
29 #include "access/rewriteheap.h"
30 #include "access/subtrans.h"
31 #include "access/timeline.h"
32 #include "access/transam.h"
33 #include "access/twophase.h"
34 #include "access/xact.h"
35 #include "access/xlog_internal.h"
36 #include "access/xloginsert.h"
37 #include "access/xlogreader.h"
38 #include "access/xlogutils.h"
39 #include "catalog/catversion.h"
40 #include "catalog/pg_control.h"
41 #include "catalog/pg_database.h"
42 #include "commands/tablespace.h"
44 #include "miscadmin.h"
45 #include "pg_trace.h"
46 #include "pgstat.h"
47 #include "port/atomics.h"
48 #include "postmaster/bgwriter.h"
49 #include "postmaster/startup.h"
50 #include "postmaster/walwriter.h"
51 #include "replication/basebackup.h"
52 #include "replication/logical.h"
53 #include "replication/origin.h"
54 #include "replication/slot.h"
55 #include "replication/snapbuild.h"
57 #include "replication/walsender.h"
58 #include "storage/bufmgr.h"
59 #include "storage/fd.h"
60 #include "storage/ipc.h"
61 #include "storage/large_object.h"
62 #include "storage/latch.h"
63 #include "storage/pmsignal.h"
64 #include "storage/predicate.h"
65 #include "storage/proc.h"
66 #include "storage/procarray.h"
67 #include "storage/reinit.h"
68 #include "storage/smgr.h"
69 #include "storage/spin.h"
70 #include "storage/sync.h"
71 #include "utils/builtins.h"
72 #include "utils/guc.h"
73 #include "utils/memutils.h"
74 #include "utils/ps_status.h"
75 #include "utils/relmapper.h"
76 #include "utils/snapmgr.h"
77 #include "utils/timestamp.h"
78 
80 
81 /* Unsupported old recovery command file names (relative to $PGDATA) */
82 #define RECOVERY_COMMAND_FILE "recovery.conf"
83 #define RECOVERY_COMMAND_DONE "recovery.done"
84 
85 /* User-settable parameters */
86 int max_wal_size_mb = 1024; /* 1 GB */
87 int min_wal_size_mb = 80; /* 80 MB */
89 int XLOGbuffers = -1;
92 char *XLogArchiveCommand = NULL;
93 bool EnableHotStandby = false;
94 bool fullPageWrites = true;
95 bool wal_log_hints = false;
96 bool wal_compression = false;
99 bool wal_init_zero = true;
100 bool wal_recycle = true;
101 bool log_checkpoints = false;
104 int CommitDelay = 0; /* precommit delay in microseconds */
105 int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
107 
108 #ifdef WAL_DEBUG
109 bool XLOG_DEBUG = false;
110 #endif
111 
113 
114 /*
115  * Number of WAL insertion locks to use. A higher value allows more insertions
116  * to happen concurrently, but adds some CPU overhead to flushing the WAL,
117  * which needs to iterate all the locks.
118  */
119 #define NUM_XLOGINSERT_LOCKS 8
120 
121 /*
122  * Max distance from last checkpoint, before triggering a new xlog-based
123  * checkpoint.
124  */
126 
127 /* Estimated distance between checkpoints, in bytes */
128 static double CheckPointDistanceEstimate = 0;
129 static double PrevCheckPointDistance = 0;
130 
131 /*
132  * GUC support
133  */
135  {"fsync", SYNC_METHOD_FSYNC, false},
136 #ifdef HAVE_FSYNC_WRITETHROUGH
137  {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
138 #endif
139 #ifdef HAVE_FDATASYNC
140  {"fdatasync", SYNC_METHOD_FDATASYNC, false},
141 #endif
142 #ifdef OPEN_SYNC_FLAG
143  {"open_sync", SYNC_METHOD_OPEN, false},
144 #endif
145 #ifdef OPEN_DATASYNC_FLAG
146  {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
147 #endif
148  {NULL, 0, false}
149 };
150 
151 
152 /*
153  * Although only "on", "off", and "always" are documented,
154  * we accept all the likely variants of "on" and "off".
155  */
157  {"always", ARCHIVE_MODE_ALWAYS, false},
158  {"on", ARCHIVE_MODE_ON, false},
159  {"off", ARCHIVE_MODE_OFF, false},
160  {"true", ARCHIVE_MODE_ON, true},
161  {"false", ARCHIVE_MODE_OFF, true},
162  {"yes", ARCHIVE_MODE_ON, true},
163  {"no", ARCHIVE_MODE_OFF, true},
164  {"1", ARCHIVE_MODE_ON, true},
165  {"0", ARCHIVE_MODE_OFF, true},
166  {NULL, 0, false}
167 };
168 
170  {"pause", RECOVERY_TARGET_ACTION_PAUSE, false},
171  {"promote", RECOVERY_TARGET_ACTION_PROMOTE, false},
172  {"shutdown", RECOVERY_TARGET_ACTION_SHUTDOWN, false},
173  {NULL, 0, false}
174 };
175 
176 /*
177  * Statistics for current checkpoint are collected in this global struct.
178  * Because only the checkpointer or a stand-alone backend can perform
179  * checkpoints, this will be unused in normal backends.
180  */
182 
183 /*
184  * ThisTimeLineID will be same in all backends --- it identifies current
185  * WAL timeline for the database system.
186  */
188 
189 /*
190  * Are we doing recovery from XLOG?
191  *
192  * This is only ever true in the startup process; it should be read as meaning
193  * "this process is replaying WAL records", rather than "the system is in
194  * recovery mode". It should be examined primarily by functions that need
195  * to act differently when called from a WAL redo function (e.g., to skip WAL
196  * logging). To check whether the system is in recovery regardless of which
197  * process you're running in, use RecoveryInProgress() but only after shared
198  * memory startup and lock initialization.
199  */
200 bool InRecovery = false;
201 
202 /* Are we in Hot Standby mode? Only valid in startup process, see xlog.h */
204 
206 
207 /* Local copy of WalRcv->receivedUpto */
210 
211 /*
212  * During recovery, lastFullPageWrites keeps track of full_page_writes that
213  * the replayed WAL records indicate. It's initialized with full_page_writes
214  * that the recovery starting checkpoint record indicates, and then updated
215  * each time XLOG_FPW_CHANGE record is replayed.
216  */
217 static bool lastFullPageWrites;
218 
219 /*
220  * Local copy of SharedRecoveryInProgress variable. True actually means "not
221  * known, need to check the shared state".
222  */
223 static bool LocalRecoveryInProgress = true;
224 
225 /*
226  * Local copy of SharedHotStandbyActive variable. False actually means "not
227  * known, need to check the shared state".
228  */
229 static bool LocalHotStandbyActive = false;
230 
231 /*
232  * Local state for XLogInsertAllowed():
233  * 1: unconditionally allowed to insert XLOG
234  * 0: unconditionally not allowed to insert XLOG
235  * -1: must check RecoveryInProgress(); disallow until it is false
236  * Most processes start with -1 and transition to 1 after seeing that recovery
237  * is not in progress. But we can also force the value for special cases.
238  * The coding in XLogInsertAllowed() depends on the first two of these states
239  * being numerically the same as bool true and false.
240  */
241 static int LocalXLogInsertAllowed = -1;
242 
243 /*
244  * When ArchiveRecoveryRequested is set, archive recovery was requested,
245  * ie. signal files were present. When InArchiveRecovery is set, we are
246  * currently recovering using offline XLOG archives. These variables are only
247  * valid in the startup process.
248  *
249  * When ArchiveRecoveryRequested is true, but InArchiveRecovery is false, we're
250  * currently performing crash recovery using only XLOG files in pg_wal, but
251  * will switch to using offline XLOG archives as soon as we reach the end of
252  * WAL in pg_wal.
253 */
255 bool InArchiveRecovery = false;
256 
257 static bool standby_signal_file_found = false;
258 static bool recovery_signal_file_found = false;
259 
260 /* Was the last xlog file restored from archive, or local? */
261 static bool restoredFromArchive = false;
262 
263 /* Buffers dedicated to consistency checks of size BLCKSZ */
264 static char *replay_image_masked = NULL;
265 static char *master_image_masked = NULL;
266 
267 /* options formerly taken from recovery.conf for archive recovery */
269 char *recoveryEndCommand = NULL;
277 const char *recoveryTargetName;
280 
281 /* options formerly taken from recovery.conf for XLOG streaming */
282 bool StandbyModeRequested = false;
283 char *PrimaryConnInfo = NULL;
284 char *PrimarySlotName = NULL;
285 char *PromoteTriggerFile = NULL;
286 
287 /* are we currently in standby mode? */
288 bool StandbyMode = false;
289 
290 /* whether request for fast promotion has been made yet */
291 static bool fast_promote = false;
292 
293 /*
294  * if recoveryStopsBefore/After returns true, it saves information of the stop
295  * point here
296  */
301 static bool recoveryStopAfter;
302 
303 /*
304  * During normal operation, the only timeline we care about is ThisTimeLineID.
305  * During recovery, however, things are more complicated. To simplify life
306  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
307  * scan through the WAL history (that is, it is the line that was active when
308  * the currently-scanned WAL record was generated). We also need these
309  * timeline values:
310  *
311  * recoveryTargetTimeLineGoal: what the user requested, if any
312  *
313  * recoveryTargetTLIRequested: numeric value of requested timeline, if constant
314  *
315  * recoveryTargetTLI: the currently understood target timeline; changes
316  *
317  * expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
318  * its known parents, newest first (so recoveryTargetTLI is always the
319  * first list member). Only these TLIs are expected to be seen in the WAL
320  * segments we read, and indeed only these TLIs will be considered as
321  * candidate WAL files to open at all.
322  *
323  * curFileTLI: the TLI appearing in the name of the current input WAL file.
324  * (This is not necessarily the same as ThisTimeLineID, because we could
325  * be scanning data that was copied from an ancestor timeline when the current
326  * file was created.) During a sequential scan we do not allow this value
327  * to decrease.
328  */
334 
335 /*
336  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
337  * current backend. It is updated for all inserts. XactLastRecEnd points to
338  * end+1 of the last record, and is reset when we end a top-level transaction,
339  * or start a new one; so it can be used to tell if the current transaction has
340  * created any XLOG records.
341  *
342  * While in parallel mode, this may not be fully up to date. When committing,
343  * a transaction can assume this covers all xlog records written either by the
344  * user backend or by any parallel worker which was present at any point during
345  * the transaction. But when aborting, or when still in parallel mode, other
346  * parallel backends may have written WAL records at later LSNs than the value
347  * stored here. The parallel leader advances its own copy, when necessary,
348  * in WaitForParallelWorkersToFinish.
349  */
353 
354 /*
355  * RedoRecPtr is this backend's local copy of the REDO record pointer
356  * (which is almost but not quite the same as a pointer to the most recent
357  * CHECKPOINT record). We update this from the shared-memory copy,
358  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
359  * hold an insertion lock). See XLogInsertRecord for details. We are also
360  * allowed to update from XLogCtl->RedoRecPtr if we hold the info_lck;
361  * see GetRedoRecPtr. A freshly spawned backend obtains the value during
362  * InitXLOGAccess.
363  */
365 
366 /*
367  * doPageWrites is this backend's local copy of (forcePageWrites ||
368  * fullPageWrites). It is used together with RedoRecPtr to decide whether
369  * a full-page image of a page need to be taken.
370  */
371 static bool doPageWrites;
372 
373 /* Has the recovery code requested a walreceiver wakeup? */
375 
376 /*
377  * RedoStartLSN points to the checkpoint's REDO location which is specified
378  * in a backup label file, backup history file or control file. In standby
379  * mode, XLOG streaming usually starts from the position where an invalid
380  * record was found. But if we fail to read even the initial checkpoint
381  * record, we use the REDO location instead of the checkpoint location as
382  * the start position of XLOG streaming. Otherwise we would have to jump
383  * backwards to the REDO location after reading the checkpoint record,
384  * because the REDO record can precede the checkpoint record.
385  */
387 
388 /*----------
389  * Shared-memory data structures for XLOG control
390  *
391  * LogwrtRqst indicates a byte position that we need to write and/or fsync
392  * the log up to (all records before that point must be written or fsynced).
393  * LogwrtResult indicates the byte positions we have already written/fsynced.
394  * These structs are identical but are declared separately to indicate their
395  * slightly different functions.
396  *
397  * To read XLogCtl->LogwrtResult, you must hold either info_lck or
398  * WALWriteLock. To update it, you need to hold both locks. The point of
399  * this arrangement is that the value can be examined by code that already
400  * holds WALWriteLock without needing to grab info_lck as well. In addition
401  * to the shared variable, each backend has a private copy of LogwrtResult,
402  * which is updated when convenient.
403  *
404  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
405  * (protected by info_lck), but we don't need to cache any copies of it.
406  *
407  * info_lck is only held long enough to read/update the protected variables,
408  * so it's a plain spinlock. The other locks are held longer (potentially
409  * over I/O operations), so we use LWLocks for them. These locks are:
410  *
411  * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
412  * It is only held while initializing and changing the mapping. If the
413  * contents of the buffer being replaced haven't been written yet, the mapping
414  * lock is released while the write is done, and reacquired afterwards.
415  *
416  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
417  * XLogFlush).
418  *
419  * ControlFileLock: must be held to read/update control file or create
420  * new log file.
421  *
422  * CheckpointLock: must be held to do a checkpoint or restartpoint (ensures
423  * only one checkpointer at a time; currently, with all checkpoints done by
424  * the checkpointer, this is just pro forma).
425  *
426  *----------
427  */
428 
429 typedef struct XLogwrtRqst
430 {
431  XLogRecPtr Write; /* last byte + 1 to write out */
432  XLogRecPtr Flush; /* last byte + 1 to flush */
433 } XLogwrtRqst;
434 
435 typedef struct XLogwrtResult
436 {
437  XLogRecPtr Write; /* last byte + 1 written out */
438  XLogRecPtr Flush; /* last byte + 1 flushed */
439 } XLogwrtResult;
440 
441 /*
442  * Inserting to WAL is protected by a small fixed number of WAL insertion
443  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
444  * matter which one. To lock out other concurrent insertions, you must hold
445  * of them. Each WAL insertion lock consists of a lightweight lock, plus an
446  * indicator of how far the insertion has progressed (insertingAt).
447  *
448  * The insertingAt values are read when a process wants to flush WAL from
449  * the in-memory buffers to disk, to check that all the insertions to the
450  * region the process is about to write out have finished. You could simply
451  * wait for all currently in-progress insertions to finish, but the
452  * insertingAt indicator allows you to ignore insertions to later in the WAL,
453  * so that you only wait for the insertions that are modifying the buffers
454  * you're about to write out.
455  *
456  * This isn't just an optimization. If all the WAL buffers are dirty, an
457  * inserter that's holding a WAL insert lock might need to evict an old WAL
458  * buffer, which requires flushing the WAL. If it's possible for an inserter
459  * to block on another inserter unnecessarily, deadlock can arise when two
460  * inserters holding a WAL insert lock wait for each other to finish their
461  * insertion.
462  *
463  * Small WAL records that don't cross a page boundary never update the value,
464  * the WAL record is just copied to the page and the lock is released. But
465  * to avoid the deadlock-scenario explained above, the indicator is always
466  * updated before sleeping while holding an insertion lock.
467  *
468  * lastImportantAt contains the LSN of the last important WAL record inserted
469  * using a given lock. This value is used to detect if there has been
470  * important WAL activity since the last time some action, like a checkpoint,
471  * was performed - allowing to not repeat the action if not. The LSN is
472  * updated for all insertions, unless the XLOG_MARK_UNIMPORTANT flag was
473  * set. lastImportantAt is never cleared, only overwritten by the LSN of newer
474  * records. Tracking the WAL activity directly in WALInsertLock has the
475  * advantage of not needing any additional locks to update the value.
476  */
477 typedef struct
478 {
482 } WALInsertLock;
483 
484 /*
485  * All the WAL insertion locks are allocated as an array in shared memory. We
486  * force the array stride to be a power of 2, which saves a few cycles in
487  * indexing, but more importantly also ensures that individual slots don't
488  * cross cache line boundaries. (Of course, we have to also ensure that the
489  * array start address is suitably aligned.)
490  */
491 typedef union WALInsertLockPadded
492 {
496 
497 /*
498  * State of an exclusive backup, necessary to control concurrent activities
499  * across sessions when working on exclusive backups.
500  *
501  * EXCLUSIVE_BACKUP_NONE means that there is no exclusive backup actually
502  * running, to be more precise pg_start_backup() is not being executed for
503  * an exclusive backup and there is no exclusive backup in progress.
504  * EXCLUSIVE_BACKUP_STARTING means that pg_start_backup() is starting an
505  * exclusive backup.
506  * EXCLUSIVE_BACKUP_IN_PROGRESS means that pg_start_backup() has finished
507  * running and an exclusive backup is in progress. pg_stop_backup() is
508  * needed to finish it.
509  * EXCLUSIVE_BACKUP_STOPPING means that pg_stop_backup() is stopping an
510  * exclusive backup.
511  */
513 {
519 
520 /*
521  * Session status of running backup, used for sanity checks in SQL-callable
522  * functions to start and stop backups.
523  */
525 
526 /*
527  * Shared state data for WAL insertion.
528  */
529 typedef struct XLogCtlInsert
530 {
531  slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
532 
533  /*
534  * CurrBytePos is the end of reserved WAL. The next record will be
535  * inserted at that position. PrevBytePos is the start position of the
536  * previously inserted (or rather, reserved) record - it is copied to the
537  * prev-link of the next record. These are stored as "usable byte
538  * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
539  */
540  uint64 CurrBytePos;
541  uint64 PrevBytePos;
542 
543  /*
544  * Make sure the above heavily-contended spinlock and byte positions are
545  * on their own cache line. In particular, the RedoRecPtr and full page
546  * write variables below should be on a different cache line. They are
547  * read on every WAL insertion, but updated rarely, and we don't want
548  * those reads to steal the cache line containing Curr/PrevBytePos.
549  */
551 
552  /*
553  * fullPageWrites is the master copy used by all backends to determine
554  * whether to write full-page to WAL, instead of using process-local one.
555  * This is required because, when full_page_writes is changed by SIGHUP,
556  * we must WAL-log it before it actually affects WAL-logging by backends.
557  * Checkpointer sets at startup or after SIGHUP.
558  *
559  * To read these fields, you must hold an insertion lock. To modify them,
560  * you must hold ALL the locks.
561  */
562  XLogRecPtr RedoRecPtr; /* current redo point for insertions */
563  bool forcePageWrites; /* forcing full-page writes for PITR? */
565 
566  /*
567  * exclusiveBackupState indicates the state of an exclusive backup (see
568  * comments of ExclusiveBackupState for more details). nonExclusiveBackups
569  * is a counter indicating the number of streaming base backups currently
570  * in progress. forcePageWrites is set to true when either of these is
571  * non-zero. lastBackupStart is the latest checkpoint redo location used
572  * as a starting point for an online backup.
573  */
577 
578  /*
579  * WAL insertion locks.
580  */
582 } XLogCtlInsert;
583 
584 /*
585  * Total shared-memory state for XLOG.
586  */
587 typedef struct XLogCtlData
588 {
590 
591  /* Protected by info_lck: */
593  XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
594  FullTransactionId ckptFullXid; /* nextFullXid of latest checkpoint */
595  XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
596  XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
597 
598  XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */
599 
600  /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
603 
604  /* Time and LSN of last xlog segment switch. Protected by WALWriteLock. */
607 
608  /*
609  * Protected by info_lck and WALWriteLock (you must hold either lock to
610  * read it, but both to update)
611  */
613 
614  /*
615  * Latest initialized page in the cache (last byte position + 1).
616  *
617  * To change the identity of a buffer (and InitializedUpTo), you need to
618  * hold WALBufMappingLock. To change the identity of a buffer that's
619  * still dirty, the old page needs to be written out first, and for that
620  * you need WALWriteLock, and you need to ensure that there are no
621  * in-progress insertions to the page by calling
622  * WaitXLogInsertionsToFinish().
623  */
625 
626  /*
627  * These values do not change after startup, although the pointed-to pages
628  * and xlblocks values certainly do. xlblocks values are protected by
629  * WALBufMappingLock.
630  */
631  char *pages; /* buffers for unwritten XLOG pages */
632  XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
633  int XLogCacheBlck; /* highest allocated xlog buffer index */
634 
635  /*
636  * Shared copy of ThisTimeLineID. Does not change after end-of-recovery.
637  * If we created a new timeline when the system was started up,
638  * PrevTimeLineID is the old timeline's ID that we forked off from.
639  * Otherwise it's equal to ThisTimeLineID.
640  */
643 
644  /*
645  * SharedRecoveryInProgress indicates if we're still in crash or archive
646  * recovery. Protected by info_lck.
647  */
649 
650  /*
651  * SharedHotStandbyActive indicates if we're still in crash or archive
652  * recovery. Protected by info_lck.
653  */
655 
656  /*
657  * WalWriterSleeping indicates whether the WAL writer is currently in
658  * low-power mode (and hence should be nudged if an async commit occurs).
659  * Protected by info_lck.
660  */
662 
663  /*
664  * recoveryWakeupLatch is used to wake up the startup process to continue
665  * WAL replay, if it is waiting for WAL to arrive or failover trigger file
666  * to appear.
667  */
669 
670  /*
671  * During recovery, we keep a copy of the latest checkpoint record here.
672  * lastCheckPointRecPtr points to start of checkpoint record and
673  * lastCheckPointEndPtr points to end+1 of checkpoint record. Used by the
674  * checkpointer when it wants to create a restartpoint.
675  *
676  * Protected by info_lck.
677  */
681 
682  /*
683  * lastReplayedEndRecPtr points to end+1 of the last record successfully
684  * replayed. When we're currently replaying a record, ie. in a redo
685  * function, replayEndRecPtr points to the end+1 of the record being
686  * replayed, otherwise it's equal to lastReplayedEndRecPtr.
687  */
692  /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
694 
695  /*
696  * timestamp of when we started replaying the current chunk of WAL data,
697  * only relevant for replication or archive recovery
698  */
700  /* Are we requested to pause recovery? */
702 
703  /*
704  * lastFpwDisableRecPtr points to the start of the last replayed
705  * XLOG_FPW_CHANGE record that instructs full_page_writes is disabled.
706  */
708 
709  slock_t info_lck; /* locks shared variables shown above */
710 } XLogCtlData;
711 
712 static XLogCtlData *XLogCtl = NULL;
713 
714 /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
716 
717 /*
718  * We maintain an image of pg_control in shared memory.
719  */
721 
722 /*
723  * Calculate the amount of space left on the page after 'endptr'. Beware
724  * multiple evaluation!
725  */
726 #define INSERT_FREESPACE(endptr) \
727  (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
728 
729 /* Macro to advance to next buffer index. */
730 #define NextBufIdx(idx) \
731  (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
732 
733 /*
734  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
735  * would hold if it was in cache, the page containing 'recptr'.
736  */
737 #define XLogRecPtrToBufIdx(recptr) \
738  (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
739 
740 /*
741  * These are the number of bytes in a WAL page usable for WAL data.
742  */
743 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
744 
745 /* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
746 #define ConvertToXSegs(x, segsize) \
747  (x / ((segsize) / (1024 * 1024)))
748 
749 /* The number of bytes in a WAL segment usable for WAL data. */
751 
752 /*
753  * Private, possibly out-of-date copy of shared LogwrtResult.
754  * See discussion above.
755  */
756 static XLogwrtResult LogwrtResult = {0, 0};
757 
758 /*
759  * Codes indicating where we got a WAL file from during recovery, or where
760  * to attempt to get one.
761  */
762 typedef enum
763 {
764  XLOG_FROM_ANY = 0, /* request to read WAL from any source */
765  XLOG_FROM_ARCHIVE, /* restored using restore_command */
766  XLOG_FROM_PG_WAL, /* existing file in pg_wal */
767  XLOG_FROM_STREAM /* streamed from master */
768 } XLogSource;
769 
770 /* human-readable names for XLogSources, for debugging output */
771 static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
772 
773 /*
774  * openLogFile is -1 or a kernel FD for an open log file segment.
775  * openLogSegNo identifies the segment. These variables are only used to
776  * write the XLOG, and so will normally refer to the active segment.
777  */
778 static int openLogFile = -1;
780 
781 /*
782  * These variables are used similarly to the ones above, but for reading
783  * the XLOG. Note, however, that readOff generally represents the offset
784  * of the page just read, not the seek position of the FD itself, which
785  * will be just past that page. readLen indicates how much of the current
786  * page has been read into readBuf, and readSource indicates where we got
787  * the currently open file from.
788  */
789 static int readFile = -1;
790 static XLogSegNo readSegNo = 0;
791 static uint32 readOff = 0;
792 static uint32 readLen = 0;
793 static XLogSource readSource = 0; /* XLOG_FROM_* code */
794 
795 /*
796  * Keeps track of which source we're currently reading from. This is
797  * different from readSource in that this is always set, even when we don't
798  * currently have a WAL file open. If lastSourceFailed is set, our last
799  * attempt to read from currentSource failed, and we should try another source
800  * next.
801  */
802 static XLogSource currentSource = 0; /* XLOG_FROM_* code */
803 static bool lastSourceFailed = false;
804 
805 typedef struct XLogPageReadPrivate
806 {
807  int emode;
808  bool fetching_ckpt; /* are we fetching a checkpoint record? */
811 
812 /*
813  * These variables track when we last obtained some WAL data to process,
814  * and where we got it from. (XLogReceiptSource is initially the same as
815  * readSource, but readSource gets reset to zero when we don't have data
816  * to process right now. It is also different from currentSource, which
817  * also changes when we try to read from a source and fail, while
818  * XLogReceiptSource tracks where we last successfully read some WAL.)
819  */
821 static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
822 
823 /* State information for XLOG reading */
824 static XLogRecPtr ReadRecPtr; /* start of last record read */
825 static XLogRecPtr EndRecPtr; /* end+1 of last record read */
826 
827 /*
828  * Local copies of equivalent fields in the control file. When running
829  * crash recovery, minRecoveryPoint is set to InvalidXLogRecPtr as we
830  * expect to replay all the WAL available, and updateMinRecoveryPoint is
831  * switched to false to prevent any updates while replaying records.
832  * Those values are kept consistent as long as crash recovery runs.
833  */
836 static bool updateMinRecoveryPoint = true;
837 
838 /*
839  * Have we reached a consistent database state? In crash recovery, we have
840  * to replay all the WAL, so reachedConsistency is never set. During archive
841  * recovery, the database is consistent once minRecoveryPoint is reached.
842  */
843 bool reachedConsistency = false;
844 
845 static bool InRedo = false;
846 
847 /* Have we launched bgwriter during recovery? */
848 static bool bgwriterLaunched = false;
849 
850 /* For WALInsertLockAcquire/Release functions */
851 static int MyLockNo = 0;
852 static bool holdingAllLocks = false;
853 
854 #ifdef WAL_DEBUG
855 static MemoryContext walDebugCxt = NULL;
856 #endif
857 
858 static void readRecoverySignalFile(void);
859 static void validateRecoveryParameters(void);
860 static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
861 static bool recoveryStopsBefore(XLogReaderState *record);
862 static bool recoveryStopsAfter(XLogReaderState *record);
863 static void recoveryPausesHere(void);
864 static bool recoveryApplyDelay(XLogReaderState *record);
865 static void SetLatestXTime(TimestampTz xtime);
866 static void SetCurrentChunkStartTime(TimestampTz xtime);
867 static void CheckRequiredParameterValues(void);
868 static void XLogReportParameters(void);
869 static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
870  TimeLineID prevTLI);
871 static void LocalSetXLogInsertAllowed(void);
872 static void CreateEndOfRecoveryRecord(void);
873 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
874 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
876 
877 static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
878 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
879 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
880 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
881  bool find_free, XLogSegNo max_segno,
882  bool use_lock);
883 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
884  int source, bool notfoundOk);
885 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
886 static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
887  int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
888 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
889  bool fetching_ckpt, XLogRecPtr tliRecPtr);
890 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
891 static void XLogFileClose(void);
892 static void PreallocXlogFiles(XLogRecPtr endptr);
893 static void RemoveTempXlogFiles(void);
894 static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr);
895 static void RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr);
896 static void UpdateLastRemovedPtr(char *filename);
897 static void ValidateXLOGDirectoryStructure(void);
898 static void CleanupBackupHistory(void);
899 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
900 static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
901  int emode, bool fetching_ckpt);
902 static void CheckRecoveryConsistency(void);
904  XLogRecPtr RecPtr, int whichChkpt, bool report);
905 static bool rescanLatestTimeLine(void);
906 static void WriteControlFile(void);
907 static void ReadControlFile(void);
908 static char *str_time(pg_time_t tnow);
909 static bool CheckForStandbyTrigger(void);
910 
911 #ifdef WAL_DEBUG
912 static void xlog_outrec(StringInfo buf, XLogReaderState *record);
913 #endif
914 static void xlog_outdesc(StringInfo buf, XLogReaderState *record);
915 static void pg_start_backup_callback(int code, Datum arg);
916 static void pg_stop_backup_callback(int code, Datum arg);
917 static bool read_backup_label(XLogRecPtr *checkPointLoc,
918  bool *backupEndRequired, bool *backupFromStandby);
919 static bool read_tablespace_map(List **tablespaces);
920 
921 static void rm_redo_error_callback(void *arg);
922 static int get_sync_bit(int method);
923 
924 static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
925  XLogRecData *rdata,
926  XLogRecPtr StartPos, XLogRecPtr EndPos);
927 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
928  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
929 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
930  XLogRecPtr *PrevPtr);
932 static char *GetXLogBuffer(XLogRecPtr ptr);
933 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
934 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
935 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
936 static void checkXLogConsistency(XLogReaderState *record);
937 
938 static void WALInsertLockAcquire(void);
939 static void WALInsertLockAcquireExclusive(void);
940 static void WALInsertLockRelease(void);
941 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
942 
943 /*
944  * Insert an XLOG record represented by an already-constructed chain of data
945  * chunks. This is a low-level routine; to construct the WAL record header
946  * and data, use the higher-level routines in xloginsert.c.
947  *
948  * If 'fpw_lsn' is valid, it is the oldest LSN among the pages that this
949  * WAL record applies to, that were not included in the record as full page
950  * images. If fpw_lsn <= RedoRecPtr, the function does not perform the
951  * insertion and returns InvalidXLogRecPtr. The caller can then recalculate
952  * which pages need a full-page image, and retry. If fpw_lsn is invalid, the
953  * record is always inserted.
954  *
955  * 'flags' gives more in-depth control on the record being inserted. See
956  * XLogSetRecordFlags() for details.
957  *
958  * The first XLogRecData in the chain must be for the record header, and its
959  * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and
960  * xl_crc fields in the header, the rest of the header must already be filled
961  * by the caller.
962  *
963  * Returns XLOG pointer to end of record (beginning of next record).
964  * This can be used as LSN for data pages affected by the logged action.
965  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
966  * before the data page can be written out. This implements the basic
967  * WAL rule "write the log before the data".)
968  */
971  XLogRecPtr fpw_lsn,
972  uint8 flags)
973 {
974  XLogCtlInsert *Insert = &XLogCtl->Insert;
975  pg_crc32c rdata_crc;
976  bool inserted;
977  XLogRecord *rechdr = (XLogRecord *) rdata->data;
978  uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
979  bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
980  info == XLOG_SWITCH);
981  XLogRecPtr StartPos;
982  XLogRecPtr EndPos;
983  bool prevDoPageWrites = doPageWrites;
984 
985  /* we assume that all of the record header is in the first chunk */
986  Assert(rdata->len >= SizeOfXLogRecord);
987 
988  /* cross-check on whether we should be here or not */
989  if (!XLogInsertAllowed())
990  elog(ERROR, "cannot make new WAL entries during recovery");
991 
992  /*----------
993  *
994  * We have now done all the preparatory work we can without holding a
995  * lock or modifying shared state. From here on, inserting the new WAL
996  * record to the shared WAL buffer cache is a two-step process:
997  *
998  * 1. Reserve the right amount of space from the WAL. The current head of
999  * reserved space is kept in Insert->CurrBytePos, and is protected by
1000  * insertpos_lck.
1001  *
1002  * 2. Copy the record to the reserved WAL space. This involves finding the
1003  * correct WAL buffer containing the reserved space, and copying the
1004  * record in place. This can be done concurrently in multiple processes.
1005  *
1006  * To keep track of which insertions are still in-progress, each concurrent
1007  * inserter acquires an insertion lock. In addition to just indicating that
1008  * an insertion is in progress, the lock tells others how far the inserter
1009  * has progressed. There is a small fixed number of insertion locks,
1010  * determined by NUM_XLOGINSERT_LOCKS. When an inserter crosses a page
1011  * boundary, it updates the value stored in the lock to the how far it has
1012  * inserted, to allow the previous buffer to be flushed.
1013  *
1014  * Holding onto an insertion lock also protects RedoRecPtr and
1015  * fullPageWrites from changing until the insertion is finished.
1016  *
1017  * Step 2 can usually be done completely in parallel. If the required WAL
1018  * page is not initialized yet, you have to grab WALBufMappingLock to
1019  * initialize it, but the WAL writer tries to do that ahead of insertions
1020  * to avoid that from happening in the critical path.
1021  *
1022  *----------
1023  */
1025  if (isLogSwitch)
1027  else
1029 
1030  /*
1031  * Check to see if my copy of RedoRecPtr is out of date. If so, may have
1032  * to go back and have the caller recompute everything. This can only
1033  * happen just after a checkpoint, so it's better to be slow in this case
1034  * and fast otherwise.
1035  *
1036  * Also check to see if fullPageWrites or forcePageWrites was just turned
1037  * on; if we weren't already doing full-page writes then go back and
1038  * recompute.
1039  *
1040  * If we aren't doing full-page writes then RedoRecPtr doesn't actually
1041  * affect the contents of the XLOG record, so we'll update our local copy
1042  * but not force a recomputation. (If doPageWrites was just turned off,
1043  * we could recompute the record without full pages, but we choose not to
1044  * bother.)
1045  */
1046  if (RedoRecPtr != Insert->RedoRecPtr)
1047  {
1048  Assert(RedoRecPtr < Insert->RedoRecPtr);
1049  RedoRecPtr = Insert->RedoRecPtr;
1050  }
1051  doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
1052 
1053  if (doPageWrites &&
1054  (!prevDoPageWrites ||
1055  (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
1056  {
1057  /*
1058  * Oops, some buffer now needs to be backed up that the caller didn't
1059  * back up. Start over.
1060  */
1062  END_CRIT_SECTION();
1063  return InvalidXLogRecPtr;
1064  }
1065 
1066  /*
1067  * Reserve space for the record in the WAL. This also sets the xl_prev
1068  * pointer.
1069  */
1070  if (isLogSwitch)
1071  inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
1072  else
1073  {
1074  ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
1075  &rechdr->xl_prev);
1076  inserted = true;
1077  }
1078 
1079  if (inserted)
1080  {
1081  /*
1082  * Now that xl_prev has been filled in, calculate CRC of the record
1083  * header.
1084  */
1085  rdata_crc = rechdr->xl_crc;
1086  COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
1087  FIN_CRC32C(rdata_crc);
1088  rechdr->xl_crc = rdata_crc;
1089 
1090  /*
1091  * All the record data, including the header, is now ready to be
1092  * inserted. Copy the record in the space reserved.
1093  */
1094  CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
1095  StartPos, EndPos);
1096 
1097  /*
1098  * Unless record is flagged as not important, update LSN of last
1099  * important record in the current slot. When holding all locks, just
1100  * update the first one.
1101  */
1102  if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
1103  {
1104  int lockno = holdingAllLocks ? 0 : MyLockNo;
1105 
1106  WALInsertLocks[lockno].l.lastImportantAt = StartPos;
1107  }
1108  }
1109  else
1110  {
1111  /*
1112  * This was an xlog-switch record, but the current insert location was
1113  * already exactly at the beginning of a segment, so there was no need
1114  * to do anything.
1115  */
1116  }
1117 
1118  /*
1119  * Done! Let others know that we're finished.
1120  */
1122 
1124 
1125  END_CRIT_SECTION();
1126 
1127  /*
1128  * Update shared LogwrtRqst.Write, if we crossed page boundary.
1129  */
1130  if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
1131  {
1132  SpinLockAcquire(&XLogCtl->info_lck);
1133  /* advance global request to include new block(s) */
1134  if (XLogCtl->LogwrtRqst.Write < EndPos)
1135  XLogCtl->LogwrtRqst.Write = EndPos;
1136  /* update local result copy while I have the chance */
1137  LogwrtResult = XLogCtl->LogwrtResult;
1138  SpinLockRelease(&XLogCtl->info_lck);
1139  }
1140 
1141  /*
1142  * If this was an XLOG_SWITCH record, flush the record and the empty
1143  * padding space that fills the rest of the segment, and perform
1144  * end-of-segment actions (eg, notifying archiver).
1145  */
1146  if (isLogSwitch)
1147  {
1148  TRACE_POSTGRESQL_WAL_SWITCH();
1149  XLogFlush(EndPos);
1150 
1151  /*
1152  * Even though we reserved the rest of the segment for us, which is
1153  * reflected in EndPos, we return a pointer to just the end of the
1154  * xlog-switch record.
1155  */
1156  if (inserted)
1157  {
1158  EndPos = StartPos + SizeOfXLogRecord;
1159  if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
1160  {
1161  uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
1162 
1163  if (offset == EndPos % XLOG_BLCKSZ)
1164  EndPos += SizeOfXLogLongPHD;
1165  else
1166  EndPos += SizeOfXLogShortPHD;
1167  }
1168  }
1169  }
1170 
1171 #ifdef WAL_DEBUG
1172  if (XLOG_DEBUG)
1173  {
1174  static XLogReaderState *debug_reader = NULL;
1176  StringInfoData recordBuf;
1177  char *errormsg = NULL;
1178  MemoryContext oldCxt;
1179 
1180  oldCxt = MemoryContextSwitchTo(walDebugCxt);
1181 
1182  initStringInfo(&buf);
1183  appendStringInfo(&buf, "INSERT @ %X/%X: ",
1184  (uint32) (EndPos >> 32), (uint32) EndPos);
1185 
1186  /*
1187  * We have to piece together the WAL record data from the XLogRecData
1188  * entries, so that we can pass it to the rm_desc function as one
1189  * contiguous chunk.
1190  */
1191  initStringInfo(&recordBuf);
1192  for (; rdata != NULL; rdata = rdata->next)
1193  appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
1194 
1195  if (!debug_reader)
1196  debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
1197  NULL, NULL);
1198 
1199  if (!debug_reader)
1200  {
1201  appendStringInfoString(&buf, "error decoding record: out of memory");
1202  }
1203  else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
1204  &errormsg))
1205  {
1206  appendStringInfo(&buf, "error decoding record: %s",
1207  errormsg ? errormsg : "no error message");
1208  }
1209  else
1210  {
1211  appendStringInfoString(&buf, " - ");
1212  xlog_outdesc(&buf, debug_reader);
1213  }
1214  elog(LOG, "%s", buf.data);
1215 
1216  pfree(buf.data);
1217  pfree(recordBuf.data);
1218  MemoryContextSwitchTo(oldCxt);
1219  }
1220 #endif
1221 
1222  /*
1223  * Update our global variables
1224  */
1225  ProcLastRecPtr = StartPos;
1226  XactLastRecEnd = EndPos;
1227 
1228  return EndPos;
1229 }
1230 
1231 /*
1232  * Reserves the right amount of space for a record of given size from the WAL.
1233  * *StartPos is set to the beginning of the reserved section, *EndPos to
1234  * its end+1. *PrevPtr is set to the beginning of the previous record; it is
1235  * used to set the xl_prev of this record.
1236  *
1237  * This is the performance critical part of XLogInsert that must be serialized
1238  * across backends. The rest can happen mostly in parallel. Try to keep this
1239  * section as short as possible, insertpos_lck can be heavily contended on a
1240  * busy system.
1241  *
1242  * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
1243  * where we actually copy the record to the reserved space.
1244  */
1245 static void
1246 ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
1247  XLogRecPtr *PrevPtr)
1248 {
1249  XLogCtlInsert *Insert = &XLogCtl->Insert;
1250  uint64 startbytepos;
1251  uint64 endbytepos;
1252  uint64 prevbytepos;
1253 
1254  size = MAXALIGN(size);
1255 
1256  /* All (non xlog-switch) records should contain data. */
1257  Assert(size > SizeOfXLogRecord);
1258 
1259  /*
1260  * The duration the spinlock needs to be held is minimized by minimizing
1261  * the calculations that have to be done while holding the lock. The
1262  * current tip of reserved WAL is kept in CurrBytePos, as a byte position
1263  * that only counts "usable" bytes in WAL, that is, it excludes all WAL
1264  * page headers. The mapping between "usable" byte positions and physical
1265  * positions (XLogRecPtrs) can be done outside the locked region, and
1266  * because the usable byte position doesn't include any headers, reserving
1267  * X bytes from WAL is almost as simple as "CurrBytePos += X".
1268  */
1269  SpinLockAcquire(&Insert->insertpos_lck);
1270 
1271  startbytepos = Insert->CurrBytePos;
1272  endbytepos = startbytepos + size;
1273  prevbytepos = Insert->PrevBytePos;
1274  Insert->CurrBytePos = endbytepos;
1275  Insert->PrevBytePos = startbytepos;
1276 
1277  SpinLockRelease(&Insert->insertpos_lck);
1278 
1279  *StartPos = XLogBytePosToRecPtr(startbytepos);
1280  *EndPos = XLogBytePosToEndRecPtr(endbytepos);
1281  *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
1282 
1283  /*
1284  * Check that the conversions between "usable byte positions" and
1285  * XLogRecPtrs work consistently in both directions.
1286  */
1287  Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
1288  Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
1289  Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
1290 }
1291 
1292 /*
1293  * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
1294  *
1295  * A log-switch record is handled slightly differently. The rest of the
1296  * segment will be reserved for this insertion, as indicated by the returned
1297  * *EndPos value. However, if we are already at the beginning of the current
1298  * segment, *StartPos and *EndPos are set to the current location without
1299  * reserving any space, and the function returns false.
1300 */
1301 static bool
1302 ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
1303 {
1304  XLogCtlInsert *Insert = &XLogCtl->Insert;
1305  uint64 startbytepos;
1306  uint64 endbytepos;
1307  uint64 prevbytepos;
1309  XLogRecPtr ptr;
1310  uint32 segleft;
1311 
1312  /*
1313  * These calculations are a bit heavy-weight to be done while holding a
1314  * spinlock, but since we're holding all the WAL insertion locks, there
1315  * are no other inserters competing for it. GetXLogInsertRecPtr() does
1316  * compete for it, but that's not called very frequently.
1317  */
1318  SpinLockAcquire(&Insert->insertpos_lck);
1319 
1320  startbytepos = Insert->CurrBytePos;
1321 
1322  ptr = XLogBytePosToEndRecPtr(startbytepos);
1323  if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
1324  {
1325  SpinLockRelease(&Insert->insertpos_lck);
1326  *EndPos = *StartPos = ptr;
1327  return false;
1328  }
1329 
1330  endbytepos = startbytepos + size;
1331  prevbytepos = Insert->PrevBytePos;
1332 
1333  *StartPos = XLogBytePosToRecPtr(startbytepos);
1334  *EndPos = XLogBytePosToEndRecPtr(endbytepos);
1335 
1336  segleft = wal_segment_size - XLogSegmentOffset(*EndPos, wal_segment_size);
1337  if (segleft != wal_segment_size)
1338  {
1339  /* consume the rest of the segment */
1340  *EndPos += segleft;
1341  endbytepos = XLogRecPtrToBytePos(*EndPos);
1342  }
1343  Insert->CurrBytePos = endbytepos;
1344  Insert->PrevBytePos = startbytepos;
1345 
1346  SpinLockRelease(&Insert->insertpos_lck);
1347 
1348  *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
1349 
1350  Assert(XLogSegmentOffset(*EndPos, wal_segment_size) == 0);
1351  Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
1352  Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
1353  Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
1354 
1355  return true;
1356 }
1357 
1358 /*
1359  * Checks whether the current buffer page and backup page stored in the
1360  * WAL record are consistent or not. Before comparing the two pages, a
1361  * masking can be applied to the pages to ignore certain areas like hint bits,
1362  * unused space between pd_lower and pd_upper among other things. This
1363  * function should be called once WAL replay has been completed for a
1364  * given record.
1365  */
1366 static void
1368 {
1369  RmgrId rmid = XLogRecGetRmid(record);
1370  RelFileNode rnode;
1371  ForkNumber forknum;
1372  BlockNumber blkno;
1373  int block_id;
1374 
1375  /* Records with no backup blocks have no need for consistency checks. */
1376  if (!XLogRecHasAnyBlockRefs(record))
1377  return;
1378 
1379  Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
1380 
1381  for (block_id = 0; block_id <= record->max_block_id; block_id++)
1382  {
1383  Buffer buf;
1384  Page page;
1385 
1386  if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
1387  {
1388  /*
1389  * WAL record doesn't contain a block reference with the given id.
1390  * Do nothing.
1391  */
1392  continue;
1393  }
1394 
1395  Assert(XLogRecHasBlockImage(record, block_id));
1396 
1397  if (XLogRecBlockImageApply(record, block_id))
1398  {
1399  /*
1400  * WAL record has already applied the page, so bypass the
1401  * consistency check as that would result in comparing the full
1402  * page stored in the record with itself.
1403  */
1404  continue;
1405  }
1406 
1407  /*
1408  * Read the contents from the current buffer and store it in a
1409  * temporary page.
1410  */
1411  buf = XLogReadBufferExtended(rnode, forknum, blkno,
1413  if (!BufferIsValid(buf))
1414  continue;
1415 
1417  page = BufferGetPage(buf);
1418 
1419  /*
1420  * Take a copy of the local page where WAL has been applied to have a
1421  * comparison base before masking it...
1422  */
1423  memcpy(replay_image_masked, page, BLCKSZ);
1424 
1425  /* No need for this page anymore now that a copy is in. */
1426  UnlockReleaseBuffer(buf);
1427 
1428  /*
1429  * If the block LSN is already ahead of this WAL record, we can't
1430  * expect contents to match. This can happen if recovery is
1431  * restarted.
1432  */
1433  if (PageGetLSN(replay_image_masked) > record->EndRecPtr)
1434  continue;
1435 
1436  /*
1437  * Read the contents from the backup copy, stored in WAL record and
1438  * store it in a temporary page. There is no need to allocate a new
1439  * page here, a local buffer is fine to hold its contents and a mask
1440  * can be directly applied on it.
1441  */
1442  if (!RestoreBlockImage(record, block_id, master_image_masked))
1443  elog(ERROR, "failed to restore block image");
1444 
1445  /*
1446  * If masking function is defined, mask both the master and replay
1447  * images
1448  */
1449  if (RmgrTable[rmid].rm_mask != NULL)
1450  {
1451  RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
1452  RmgrTable[rmid].rm_mask(master_image_masked, blkno);
1453  }
1454 
1455  /* Time to compare the master and replay images. */
1456  if (memcmp(replay_image_masked, master_image_masked, BLCKSZ) != 0)
1457  {
1458  elog(FATAL,
1459  "inconsistent page found, rel %u/%u/%u, forknum %u, blkno %u",
1460  rnode.spcNode, rnode.dbNode, rnode.relNode,
1461  forknum, blkno);
1462  }
1463  }
1464 }
1465 
1466 /*
1467  * Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved
1468  * area in the WAL.
1469  */
1470 static void
1471 CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
1472  XLogRecPtr StartPos, XLogRecPtr EndPos)
1473 {
1474  char *currpos;
1475  int freespace;
1476  int written;
1477  XLogRecPtr CurrPos;
1478  XLogPageHeader pagehdr;
1479 
1480  /*
1481  * Get a pointer to the right place in the right WAL buffer to start
1482  * inserting to.
1483  */
1484  CurrPos = StartPos;
1485  currpos = GetXLogBuffer(CurrPos);
1486  freespace = INSERT_FREESPACE(CurrPos);
1487 
1488  /*
1489  * there should be enough space for at least the first field (xl_tot_len)
1490  * on this page.
1491  */
1492  Assert(freespace >= sizeof(uint32));
1493 
1494  /* Copy record data */
1495  written = 0;
1496  while (rdata != NULL)
1497  {
1498  char *rdata_data = rdata->data;
1499  int rdata_len = rdata->len;
1500 
1501  while (rdata_len > freespace)
1502  {
1503  /*
1504  * Write what fits on this page, and continue on the next page.
1505  */
1506  Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
1507  memcpy(currpos, rdata_data, freespace);
1508  rdata_data += freespace;
1509  rdata_len -= freespace;
1510  written += freespace;
1511  CurrPos += freespace;
1512 
1513  /*
1514  * Get pointer to beginning of next page, and set the xlp_rem_len
1515  * in the page header. Set XLP_FIRST_IS_CONTRECORD.
1516  *
1517  * It's safe to set the contrecord flag and xlp_rem_len without a
1518  * lock on the page. All the other flags were already set when the
1519  * page was initialized, in AdvanceXLInsertBuffer, and we're the
1520  * only backend that needs to set the contrecord flag.
1521  */
1522  currpos = GetXLogBuffer(CurrPos);
1523  pagehdr = (XLogPageHeader) currpos;
1524  pagehdr->xlp_rem_len = write_len - written;
1525  pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
1526 
1527  /* skip over the page header */
1528  if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
1529  {
1530  CurrPos += SizeOfXLogLongPHD;
1531  currpos += SizeOfXLogLongPHD;
1532  }
1533  else
1534  {
1535  CurrPos += SizeOfXLogShortPHD;
1536  currpos += SizeOfXLogShortPHD;
1537  }
1538  freespace = INSERT_FREESPACE(CurrPos);
1539  }
1540 
1541  Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
1542  memcpy(currpos, rdata_data, rdata_len);
1543  currpos += rdata_len;
1544  CurrPos += rdata_len;
1545  freespace -= rdata_len;
1546  written += rdata_len;
1547 
1548  rdata = rdata->next;
1549  }
1550  Assert(written == write_len);
1551 
1552  /*
1553  * If this was an xlog-switch, it's not enough to write the switch record,
1554  * we also have to consume all the remaining space in the WAL segment. We
1555  * have already reserved that space, but we need to actually fill it.
1556  */
1557  if (isLogSwitch && XLogSegmentOffset(CurrPos, wal_segment_size) != 0)
1558  {
1559  /* An xlog-switch record doesn't contain any data besides the header */
1560  Assert(write_len == SizeOfXLogRecord);
1561 
1562  /* Assert that we did reserve the right amount of space */
1563  Assert(XLogSegmentOffset(EndPos, wal_segment_size) == 0);
1564 
1565  /* Use up all the remaining space on the current page */
1566  CurrPos += freespace;
1567 
1568  /*
1569  * Cause all remaining pages in the segment to be flushed, leaving the
1570  * XLog position where it should be, at the start of the next segment.
1571  * We do this one page at a time, to make sure we don't deadlock
1572  * against ourselves if wal_buffers < wal_segment_size.
1573  */
1574  while (CurrPos < EndPos)
1575  {
1576  /*
1577  * The minimal action to flush the page would be to call
1578  * WALInsertLockUpdateInsertingAt(CurrPos) followed by
1579  * AdvanceXLInsertBuffer(...). The page would be left initialized
1580  * mostly to zeros, except for the page header (always the short
1581  * variant, as this is never a segment's first page).
1582  *
1583  * The large vistas of zeros are good for compressibility, but the
1584  * headers interrupting them every XLOG_BLCKSZ (with values that
1585  * differ from page to page) are not. The effect varies with
1586  * compression tool, but bzip2 for instance compresses about an
1587  * order of magnitude worse if those headers are left in place.
1588  *
1589  * Rather than complicating AdvanceXLInsertBuffer itself (which is
1590  * called in heavily-loaded circumstances as well as this lightly-
1591  * loaded one) with variant behavior, we just use GetXLogBuffer
1592  * (which itself calls the two methods we need) to get the pointer
1593  * and zero most of the page. Then we just zero the page header.
1594  */
1595  currpos = GetXLogBuffer(CurrPos);
1596  MemSet(currpos, 0, SizeOfXLogShortPHD);
1597 
1598  CurrPos += XLOG_BLCKSZ;
1599  }
1600  }
1601  else
1602  {
1603  /* Align the end position, so that the next record starts aligned */
1604  CurrPos = MAXALIGN64(CurrPos);
1605  }
1606 
1607  if (CurrPos != EndPos)
1608  elog(PANIC, "space reserved for WAL record does not match what was written");
1609 }
1610 
1611 /*
1612  * Acquire a WAL insertion lock, for inserting to WAL.
1613  */
1614 static void
1616 {
1617  bool immed;
1618 
1619  /*
1620  * It doesn't matter which of the WAL insertion locks we acquire, so try
1621  * the one we used last time. If the system isn't particularly busy, it's
1622  * a good bet that it's still available, and it's good to have some
1623  * affinity to a particular lock so that you don't unnecessarily bounce
1624  * cache lines between processes when there's no contention.
1625  *
1626  * If this is the first time through in this backend, pick a lock
1627  * (semi-)randomly. This allows the locks to be used evenly if you have a
1628  * lot of very short connections.
1629  */
1630  static int lockToTry = -1;
1631 
1632  if (lockToTry == -1)
1633  lockToTry = MyProc->pgprocno % NUM_XLOGINSERT_LOCKS;
1634  MyLockNo = lockToTry;
1635 
1636  /*
1637  * The insertingAt value is initially set to 0, as we don't know our
1638  * insert location yet.
1639  */
1640  immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
1641  if (!immed)
1642  {
1643  /*
1644  * If we couldn't get the lock immediately, try another lock next
1645  * time. On a system with more insertion locks than concurrent
1646  * inserters, this causes all the inserters to eventually migrate to a
1647  * lock that no-one else is using. On a system with more inserters
1648  * than locks, it still helps to distribute the inserters evenly
1649  * across the locks.
1650  */
1651  lockToTry = (lockToTry + 1) % NUM_XLOGINSERT_LOCKS;
1652  }
1653 }
1654 
1655 /*
1656  * Acquire all WAL insertion locks, to prevent other backends from inserting
1657  * to WAL.
1658  */
1659 static void
1661 {
1662  int i;
1663 
1664  /*
1665  * When holding all the locks, all but the last lock's insertingAt
1666  * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
1667  * XLogRecPtr value, to make sure that no-one blocks waiting on those.
1668  */
1669  for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)
1670  {
1671  LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
1672  LWLockUpdateVar(&WALInsertLocks[i].l.lock,
1673  &WALInsertLocks[i].l.insertingAt,
1674  PG_UINT64_MAX);
1675  }
1676  /* Variable value reset to 0 at release */
1677  LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
1678 
1679  holdingAllLocks = true;
1680 }
1681 
1682 /*
1683  * Release our insertion lock (or locks, if we're holding them all).
1684  *
1685  * NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
1686  * next time the lock is acquired.
1687  */
1688 static void
1690 {
1691  if (holdingAllLocks)
1692  {
1693  int i;
1694 
1695  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
1696  LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
1697  &WALInsertLocks[i].l.insertingAt,
1698  0);
1699 
1700  holdingAllLocks = false;
1701  }
1702  else
1703  {
1704  LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
1705  &WALInsertLocks[MyLockNo].l.insertingAt,
1706  0);
1707  }
1708 }
1709 
1710 /*
1711  * Update our insertingAt value, to let others know that we've finished
1712  * inserting up to that point.
1713  */
1714 static void
1716 {
1717  if (holdingAllLocks)
1718  {
1719  /*
1720  * We use the last lock to mark our actual position, see comments in
1721  * WALInsertLockAcquireExclusive.
1722  */
1723  LWLockUpdateVar(&WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.lock,
1724  &WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.insertingAt,
1725  insertingAt);
1726  }
1727  else
1728  LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
1729  &WALInsertLocks[MyLockNo].l.insertingAt,
1730  insertingAt);
1731 }
1732 
1733 /*
1734  * Wait for any WAL insertions < upto to finish.
1735  *
1736  * Returns the location of the oldest insertion that is still in-progress.
1737  * Any WAL prior to that point has been fully copied into WAL buffers, and
1738  * can be flushed out to disk. Because this waits for any insertions older
1739  * than 'upto' to finish, the return value is always >= 'upto'.
1740  *
1741  * Note: When you are about to write out WAL, you must call this function
1742  * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
1743  * need to wait for an insertion to finish (or at least advance to next
1744  * uninitialized page), and the inserter might need to evict an old WAL buffer
1745  * to make room for a new one, which in turn requires WALWriteLock.
1746  */
1747 static XLogRecPtr
1749 {
1750  uint64 bytepos;
1751  XLogRecPtr reservedUpto;
1752  XLogRecPtr finishedUpto;
1753  XLogCtlInsert *Insert = &XLogCtl->Insert;
1754  int i;
1755 
1756  if (MyProc == NULL)
1757  elog(PANIC, "cannot wait without a PGPROC structure");
1758 
1759  /* Read the current insert position */
1760  SpinLockAcquire(&Insert->insertpos_lck);
1761  bytepos = Insert->CurrBytePos;
1762  SpinLockRelease(&Insert->insertpos_lck);
1763  reservedUpto = XLogBytePosToEndRecPtr(bytepos);
1764 
1765  /*
1766  * No-one should request to flush a piece of WAL that hasn't even been
1767  * reserved yet. However, it can happen if there is a block with a bogus
1768  * LSN on disk, for example. XLogFlush checks for that situation and
1769  * complains, but only after the flush. Here we just assume that to mean
1770  * that all WAL that has been reserved needs to be finished. In this
1771  * corner-case, the return value can be smaller than 'upto' argument.
1772  */
1773  if (upto > reservedUpto)
1774  {
1775  elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
1776  (uint32) (upto >> 32), (uint32) upto,
1777  (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
1778  upto = reservedUpto;
1779  }
1780 
1781  /*
1782  * Loop through all the locks, sleeping on any in-progress insert older
1783  * than 'upto'.
1784  *
1785  * finishedUpto is our return value, indicating the point upto which all
1786  * the WAL insertions have been finished. Initialize it to the head of
1787  * reserved WAL, and as we iterate through the insertion locks, back it
1788  * out for any insertion that's still in progress.
1789  */
1790  finishedUpto = reservedUpto;
1791  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
1792  {
1793  XLogRecPtr insertingat = InvalidXLogRecPtr;
1794 
1795  do
1796  {
1797  /*
1798  * See if this insertion is in progress. LWLockWaitForVar will
1799  * wait for the lock to be released, or for the 'value' to be set
1800  * by a LWLockUpdateVar call. When a lock is initially acquired,
1801  * its value is 0 (InvalidXLogRecPtr), which means that we don't
1802  * know where it's inserting yet. We will have to wait for it. If
1803  * it's a small insertion, the record will most likely fit on the
1804  * same page and the inserter will release the lock without ever
1805  * calling LWLockUpdateVar. But if it has to sleep, it will
1806  * advertise the insertion point with LWLockUpdateVar before
1807  * sleeping.
1808  */
1809  if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
1810  &WALInsertLocks[i].l.insertingAt,
1811  insertingat, &insertingat))
1812  {
1813  /* the lock was free, so no insertion in progress */
1814  insertingat = InvalidXLogRecPtr;
1815  break;
1816  }
1817 
1818  /*
1819  * This insertion is still in progress. Have to wait, unless the
1820  * inserter has proceeded past 'upto'.
1821  */
1822  } while (insertingat < upto);
1823 
1824  if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
1825  finishedUpto = insertingat;
1826  }
1827  return finishedUpto;
1828 }
1829 
1830 /*
1831  * Get a pointer to the right location in the WAL buffer containing the
1832  * given XLogRecPtr.
1833  *
1834  * If the page is not initialized yet, it is initialized. That might require
1835  * evicting an old dirty buffer from the buffer cache, which means I/O.
1836  *
1837  * The caller must ensure that the page containing the requested location
1838  * isn't evicted yet, and won't be evicted. The way to ensure that is to
1839  * hold onto a WAL insertion lock with the insertingAt position set to
1840  * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
1841  * to evict an old page from the buffer. (This means that once you call
1842  * GetXLogBuffer() with a given 'ptr', you must not access anything before
1843  * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
1844  * later, because older buffers might be recycled already)
1845  */
1846 static char *
1848 {
1849  int idx;
1850  XLogRecPtr endptr;
1851  static uint64 cachedPage = 0;
1852  static char *cachedPos = NULL;
1853  XLogRecPtr expectedEndPtr;
1854 
1855  /*
1856  * Fast path for the common case that we need to access again the same
1857  * page as last time.
1858  */
1859  if (ptr / XLOG_BLCKSZ == cachedPage)
1860  {
1861  Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
1862  Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
1863  return cachedPos + ptr % XLOG_BLCKSZ;
1864  }
1865 
1866  /*
1867  * The XLog buffer cache is organized so that a page is always loaded to a
1868  * particular buffer. That way we can easily calculate the buffer a given
1869  * page must be loaded into, from the XLogRecPtr alone.
1870  */
1871  idx = XLogRecPtrToBufIdx(ptr);
1872 
1873  /*
1874  * See what page is loaded in the buffer at the moment. It could be the
1875  * page we're looking for, or something older. It can't be anything newer
1876  * - that would imply the page we're looking for has already been written
1877  * out to disk and evicted, and the caller is responsible for making sure
1878  * that doesn't happen.
1879  *
1880  * However, we don't hold a lock while we read the value. If someone has
1881  * just initialized the page, it's possible that we get a "torn read" of
1882  * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
1883  * that case we will see a bogus value. That's ok, we'll grab the mapping
1884  * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
1885  * the page we're looking for. But it means that when we do this unlocked
1886  * read, we might see a value that appears to be ahead of the page we're
1887  * looking for. Don't PANIC on that, until we've verified the value while
1888  * holding the lock.
1889  */
1890  expectedEndPtr = ptr;
1891  expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
1892 
1893  endptr = XLogCtl->xlblocks[idx];
1894  if (expectedEndPtr != endptr)
1895  {
1896  XLogRecPtr initializedUpto;
1897 
1898  /*
1899  * Before calling AdvanceXLInsertBuffer(), which can block, let others
1900  * know how far we're finished with inserting the record.
1901  *
1902  * NB: If 'ptr' points to just after the page header, advertise a
1903  * position at the beginning of the page rather than 'ptr' itself. If
1904  * there are no other insertions running, someone might try to flush
1905  * up to our advertised location. If we advertised a position after
1906  * the page header, someone might try to flush the page header, even
1907  * though page might actually not be initialized yet. As the first
1908  * inserter on the page, we are effectively responsible for making
1909  * sure that it's initialized, before we let insertingAt to move past
1910  * the page header.
1911  */
1912  if (ptr % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
1913  XLogSegmentOffset(ptr, wal_segment_size) > XLOG_BLCKSZ)
1914  initializedUpto = ptr - SizeOfXLogShortPHD;
1915  else if (ptr % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
1916  XLogSegmentOffset(ptr, wal_segment_size) < XLOG_BLCKSZ)
1917  initializedUpto = ptr - SizeOfXLogLongPHD;
1918  else
1919  initializedUpto = ptr;
1920 
1921  WALInsertLockUpdateInsertingAt(initializedUpto);
1922 
1923  AdvanceXLInsertBuffer(ptr, false);
1924  endptr = XLogCtl->xlblocks[idx];
1925 
1926  if (expectedEndPtr != endptr)
1927  elog(PANIC, "could not find WAL buffer for %X/%X",
1928  (uint32) (ptr >> 32), (uint32) ptr);
1929  }
1930  else
1931  {
1932  /*
1933  * Make sure the initialization of the page is visible to us, and
1934  * won't arrive later to overwrite the WAL data we write on the page.
1935  */
1937  }
1938 
1939  /*
1940  * Found the buffer holding this page. Return a pointer to the right
1941  * offset within the page.
1942  */
1943  cachedPage = ptr / XLOG_BLCKSZ;
1944  cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
1945 
1946  Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
1947  Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
1948 
1949  return cachedPos + ptr % XLOG_BLCKSZ;
1950 }
1951 
1952 /*
1953  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
1954  * is the position starting from the beginning of WAL, excluding all WAL
1955  * page headers.
1956  */
1957 static XLogRecPtr
1958 XLogBytePosToRecPtr(uint64 bytepos)
1959 {
1960  uint64 fullsegs;
1961  uint64 fullpages;
1962  uint64 bytesleft;
1963  uint32 seg_offset;
1964  XLogRecPtr result;
1965 
1966  fullsegs = bytepos / UsableBytesInSegment;
1967  bytesleft = bytepos % UsableBytesInSegment;
1968 
1969  if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
1970  {
1971  /* fits on first page of segment */
1972  seg_offset = bytesleft + SizeOfXLogLongPHD;
1973  }
1974  else
1975  {
1976  /* account for the first page on segment with long header */
1977  seg_offset = XLOG_BLCKSZ;
1978  bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
1979 
1980  fullpages = bytesleft / UsableBytesInPage;
1981  bytesleft = bytesleft % UsableBytesInPage;
1982 
1983  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
1984  }
1985 
1986  XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result);
1987 
1988  return result;
1989 }
1990 
1991 /*
1992  * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
1993  * returns a pointer to the beginning of the page (ie. before page header),
1994  * not to where the first xlog record on that page would go to. This is used
1995  * when converting a pointer to the end of a record.
1996  */
1997 static XLogRecPtr
1998 XLogBytePosToEndRecPtr(uint64 bytepos)
1999 {
2000  uint64 fullsegs;
2001  uint64 fullpages;
2002  uint64 bytesleft;
2003  uint32 seg_offset;
2004  XLogRecPtr result;
2005 
2006  fullsegs = bytepos / UsableBytesInSegment;
2007  bytesleft = bytepos % UsableBytesInSegment;
2008 
2009  if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
2010  {
2011  /* fits on first page of segment */
2012  if (bytesleft == 0)
2013  seg_offset = 0;
2014  else
2015  seg_offset = bytesleft + SizeOfXLogLongPHD;
2016  }
2017  else
2018  {
2019  /* account for the first page on segment with long header */
2020  seg_offset = XLOG_BLCKSZ;
2021  bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
2022 
2023  fullpages = bytesleft / UsableBytesInPage;
2024  bytesleft = bytesleft % UsableBytesInPage;
2025 
2026  if (bytesleft == 0)
2027  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
2028  else
2029  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
2030  }
2031 
2032  XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result);
2033 
2034  return result;
2035 }
2036 
2037 /*
2038  * Convert an XLogRecPtr to a "usable byte position".
2039  */
2040 static uint64
2042 {
2043  uint64 fullsegs;
2044  uint32 fullpages;
2045  uint32 offset;
2046  uint64 result;
2047 
2048  XLByteToSeg(ptr, fullsegs, wal_segment_size);
2049 
2050  fullpages = (XLogSegmentOffset(ptr, wal_segment_size)) / XLOG_BLCKSZ;
2051  offset = ptr % XLOG_BLCKSZ;
2052 
2053  if (fullpages == 0)
2054  {
2055  result = fullsegs * UsableBytesInSegment;
2056  if (offset > 0)
2057  {
2058  Assert(offset >= SizeOfXLogLongPHD);
2059  result += offset - SizeOfXLogLongPHD;
2060  }
2061  }
2062  else
2063  {
2064  result = fullsegs * UsableBytesInSegment +
2065  (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
2066  (fullpages - 1) * UsableBytesInPage; /* full pages */
2067  if (offset > 0)
2068  {
2069  Assert(offset >= SizeOfXLogShortPHD);
2070  result += offset - SizeOfXLogShortPHD;
2071  }
2072  }
2073 
2074  return result;
2075 }
2076 
2077 /*
2078  * Initialize XLOG buffers, writing out old buffers if they still contain
2079  * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
2080  * true, initialize as many pages as we can without having to write out
2081  * unwritten data. Any new pages are initialized to zeros, with pages headers
2082  * initialized properly.
2083  */
2084 static void
2085 AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
2086 {
2087  XLogCtlInsert *Insert = &XLogCtl->Insert;
2088  int nextidx;
2089  XLogRecPtr OldPageRqstPtr;
2090  XLogwrtRqst WriteRqst;
2091  XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
2092  XLogRecPtr NewPageBeginPtr;
2093  XLogPageHeader NewPage;
2094  int npages = 0;
2095 
2096  LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
2097 
2098  /*
2099  * Now that we have the lock, check if someone initialized the page
2100  * already.
2101  */
2102  while (upto >= XLogCtl->InitializedUpTo || opportunistic)
2103  {
2104  nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
2105 
2106  /*
2107  * Get ending-offset of the buffer page we need to replace (this may
2108  * be zero if the buffer hasn't been used yet). Fall through if it's
2109  * already written out.
2110  */
2111  OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
2112  if (LogwrtResult.Write < OldPageRqstPtr)
2113  {
2114  /*
2115  * Nope, got work to do. If we just want to pre-initialize as much
2116  * as we can without flushing, give up now.
2117  */
2118  if (opportunistic)
2119  break;
2120 
2121  /* Before waiting, get info_lck and update LogwrtResult */
2122  SpinLockAcquire(&XLogCtl->info_lck);
2123  if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
2124  XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
2125  LogwrtResult = XLogCtl->LogwrtResult;
2126  SpinLockRelease(&XLogCtl->info_lck);
2127 
2128  /*
2129  * Now that we have an up-to-date LogwrtResult value, see if we
2130  * still need to write it or if someone else already did.
2131  */
2132  if (LogwrtResult.Write < OldPageRqstPtr)
2133  {
2134  /*
2135  * Must acquire write lock. Release WALBufMappingLock first,
2136  * to make sure that all insertions that we need to wait for
2137  * can finish (up to this same position). Otherwise we risk
2138  * deadlock.
2139  */
2140  LWLockRelease(WALBufMappingLock);
2141 
2142  WaitXLogInsertionsToFinish(OldPageRqstPtr);
2143 
2144  LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
2145 
2146  LogwrtResult = XLogCtl->LogwrtResult;
2147  if (LogwrtResult.Write >= OldPageRqstPtr)
2148  {
2149  /* OK, someone wrote it already */
2150  LWLockRelease(WALWriteLock);
2151  }
2152  else
2153  {
2154  /* Have to write it ourselves */
2155  TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
2156  WriteRqst.Write = OldPageRqstPtr;
2157  WriteRqst.Flush = 0;
2158  XLogWrite(WriteRqst, false);
2159  LWLockRelease(WALWriteLock);
2160  TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
2161  }
2162  /* Re-acquire WALBufMappingLock and retry */
2163  LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
2164  continue;
2165  }
2166  }
2167 
2168  /*
2169  * Now the next buffer slot is free and we can set it up to be the
2170  * next output page.
2171  */
2172  NewPageBeginPtr = XLogCtl->InitializedUpTo;
2173  NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
2174 
2175  Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
2176 
2177  NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
2178 
2179  /*
2180  * Be sure to re-zero the buffer so that bytes beyond what we've
2181  * written will look like zeroes and not valid XLOG records...
2182  */
2183  MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
2184 
2185  /*
2186  * Fill the new page's header
2187  */
2188  NewPage->xlp_magic = XLOG_PAGE_MAGIC;
2189 
2190  /* NewPage->xlp_info = 0; */ /* done by memset */
2191  NewPage->xlp_tli = ThisTimeLineID;
2192  NewPage->xlp_pageaddr = NewPageBeginPtr;
2193 
2194  /* NewPage->xlp_rem_len = 0; */ /* done by memset */
2195 
2196  /*
2197  * If online backup is not in progress, mark the header to indicate
2198  * that WAL records beginning in this page have removable backup
2199  * blocks. This allows the WAL archiver to know whether it is safe to
2200  * compress archived WAL data by transforming full-block records into
2201  * the non-full-block format. It is sufficient to record this at the
2202  * page level because we force a page switch (in fact a segment
2203  * switch) when starting a backup, so the flag will be off before any
2204  * records can be written during the backup. At the end of a backup,
2205  * the last page will be marked as all unsafe when perhaps only part
2206  * is unsafe, but at worst the archiver would miss the opportunity to
2207  * compress a few records.
2208  */
2209  if (!Insert->forcePageWrites)
2210  NewPage->xlp_info |= XLP_BKP_REMOVABLE;
2211 
2212  /*
2213  * If first page of an XLOG segment file, make it a long header.
2214  */
2215  if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
2216  {
2217  XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
2218 
2219  NewLongPage->xlp_sysid = ControlFile->system_identifier;
2220  NewLongPage->xlp_seg_size = wal_segment_size;
2221  NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
2222  NewPage->xlp_info |= XLP_LONG_HEADER;
2223  }
2224 
2225  /*
2226  * Make sure the initialization of the page becomes visible to others
2227  * before the xlblocks update. GetXLogBuffer() reads xlblocks without
2228  * holding a lock.
2229  */
2230  pg_write_barrier();
2231 
2232  *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
2233 
2234  XLogCtl->InitializedUpTo = NewPageEndPtr;
2235 
2236  npages++;
2237  }
2238  LWLockRelease(WALBufMappingLock);
2239 
2240 #ifdef WAL_DEBUG
2241  if (XLOG_DEBUG && npages > 0)
2242  {
2243  elog(DEBUG1, "initialized %d pages, up to %X/%X",
2244  npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
2245  }
2246 #endif
2247 }
2248 
2249 /*
2250  * Calculate CheckPointSegments based on max_wal_size_mb and
2251  * checkpoint_completion_target.
2252  */
2253 static void
2255 {
2256  double target;
2257 
2258  /*-------
2259  * Calculate the distance at which to trigger a checkpoint, to avoid
2260  * exceeding max_wal_size_mb. This is based on two assumptions:
2261  *
2262  * a) we keep WAL for only one checkpoint cycle (prior to PG11 we kept
2263  * WAL for two checkpoint cycles to allow us to recover from the
2264  * secondary checkpoint if the first checkpoint failed, though we
2265  * only did this on the master anyway, not on standby. Keeping just
2266  * one checkpoint simplifies processing and reduces disk space in
2267  * many smaller databases.)
2268  * b) during checkpoint, we consume checkpoint_completion_target *
2269  * number of segments consumed between checkpoints.
2270  *-------
2271  */
2272  target = (double) ConvertToXSegs(max_wal_size_mb, wal_segment_size) /
2274 
2275  /* round down */
2276  CheckPointSegments = (int) target;
2277 
2278  if (CheckPointSegments < 1)
2279  CheckPointSegments = 1;
2280 }
2281 
2282 void
2283 assign_max_wal_size(int newval, void *extra)
2284 {
2287 }
2288 
2289 void
2291 {
2294 }
2295 
2296 /*
2297  * At a checkpoint, how many WAL segments to recycle as preallocated future
2298  * XLOG segments? Returns the highest segment that should be preallocated.
2299  */
2300 static XLogSegNo
2302 {
2303  XLogSegNo minSegNo;
2304  XLogSegNo maxSegNo;
2305  double distance;
2306  XLogSegNo recycleSegNo;
2307 
2308  /*
2309  * Calculate the segment numbers that min_wal_size_mb and max_wal_size_mb
2310  * correspond to. Always recycle enough segments to meet the minimum, and
2311  * remove enough segments to stay below the maximum.
2312  */
2313  minSegNo = lastredoptr / wal_segment_size +
2315  maxSegNo = lastredoptr / wal_segment_size +
2317 
2318  /*
2319  * Between those limits, recycle enough segments to get us through to the
2320  * estimated end of next checkpoint.
2321  *
2322  * To estimate where the next checkpoint will finish, assume that the
2323  * system runs steadily consuming CheckPointDistanceEstimate bytes between
2324  * every checkpoint.
2325  */
2327  /* add 10% for good measure. */
2328  distance *= 1.10;
2329 
2330  recycleSegNo = (XLogSegNo) ceil(((double) lastredoptr + distance) /
2332 
2333  if (recycleSegNo < minSegNo)
2334  recycleSegNo = minSegNo;
2335  if (recycleSegNo > maxSegNo)
2336  recycleSegNo = maxSegNo;
2337 
2338  return recycleSegNo;
2339 }
2340 
2341 /*
2342  * Check whether we've consumed enough xlog space that a checkpoint is needed.
2343  *
2344  * new_segno indicates a log file that has just been filled up (or read
2345  * during recovery). We measure the distance from RedoRecPtr to new_segno
2346  * and see if that exceeds CheckPointSegments.
2347  *
2348  * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
2349  */
2350 static bool
2352 {
2353  XLogSegNo old_segno;
2354 
2356 
2357  if (new_segno >= old_segno + (uint64) (CheckPointSegments - 1))
2358  return true;
2359  return false;
2360 }
2361 
2362 /*
2363  * Write and/or fsync the log at least as far as WriteRqst indicates.
2364  *
2365  * If flexible == true, we don't have to write as far as WriteRqst, but
2366  * may stop at any convenient boundary (such as a cache or logfile boundary).
2367  * This option allows us to avoid uselessly issuing multiple writes when a
2368  * single one would do.
2369  *
2370  * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
2371  * must be called before grabbing the lock, to make sure the data is ready to
2372  * write.
2373  */
2374 static void
2375 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2376 {
2377  bool ispartialpage;
2378  bool last_iteration;
2379  bool finishing_seg;
2380  bool use_existent;
2381  int curridx;
2382  int npages;
2383  int startidx;
2384  uint32 startoffset;
2385 
2386  /* We should always be inside a critical section here */
2387  Assert(CritSectionCount > 0);
2388 
2389  /*
2390  * Update local LogwrtResult (caller probably did this already, but...)
2391  */
2392  LogwrtResult = XLogCtl->LogwrtResult;
2393 
2394  /*
2395  * Since successive pages in the xlog cache are consecutively allocated,
2396  * we can usually gather multiple pages together and issue just one
2397  * write() call. npages is the number of pages we have determined can be
2398  * written together; startidx is the cache block index of the first one,
2399  * and startoffset is the file offset at which it should go. The latter
2400  * two variables are only valid when npages > 0, but we must initialize
2401  * all of them to keep the compiler quiet.
2402  */
2403  npages = 0;
2404  startidx = 0;
2405  startoffset = 0;
2406 
2407  /*
2408  * Within the loop, curridx is the cache block index of the page to
2409  * consider writing. Begin at the buffer containing the next unwritten
2410  * page, or last partially written page.
2411  */
2412  curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
2413 
2414  while (LogwrtResult.Write < WriteRqst.Write)
2415  {
2416  /*
2417  * Make sure we're not ahead of the insert process. This could happen
2418  * if we're passed a bogus WriteRqst.Write that is past the end of the
2419  * last page that's been initialized by AdvanceXLInsertBuffer.
2420  */
2421  XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
2422 
2423  if (LogwrtResult.Write >= EndPtr)
2424  elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
2425  (uint32) (LogwrtResult.Write >> 32),
2426  (uint32) LogwrtResult.Write,
2427  (uint32) (EndPtr >> 32), (uint32) EndPtr);
2428 
2429  /* Advance LogwrtResult.Write to end of current buffer page */
2430  LogwrtResult.Write = EndPtr;
2431  ispartialpage = WriteRqst.Write < LogwrtResult.Write;
2432 
2433  if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
2435  {
2436  /*
2437  * Switch to new logfile segment. We cannot have any pending
2438  * pages here (since we dump what we have at segment end).
2439  */
2440  Assert(npages == 0);
2441  if (openLogFile >= 0)
2442  XLogFileClose();
2443  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2445 
2446  /* create/use new log file */
2447  use_existent = true;
2448  openLogFile = XLogFileInit(openLogSegNo, &use_existent, true);
2449  }
2450 
2451  /* Make sure we have the current logfile open */
2452  if (openLogFile < 0)
2453  {
2454  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2457  }
2458 
2459  /* Add current page to the set of pending pages-to-dump */
2460  if (npages == 0)
2461  {
2462  /* first of group */
2463  startidx = curridx;
2464  startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
2466  }
2467  npages++;
2468 
2469  /*
2470  * Dump the set if this will be the last loop iteration, or if we are
2471  * at the last page of the cache area (since the next page won't be
2472  * contiguous in memory), or if we are at the end of the logfile
2473  * segment.
2474  */
2475  last_iteration = WriteRqst.Write <= LogwrtResult.Write;
2476 
2477  finishing_seg = !ispartialpage &&
2478  (startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
2479 
2480  if (last_iteration ||
2481  curridx == XLogCtl->XLogCacheBlck ||
2482  finishing_seg)
2483  {
2484  char *from;
2485  Size nbytes;
2486  Size nleft;
2487  int written;
2488 
2489  /* OK to write the page(s) */
2490  from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
2491  nbytes = npages * (Size) XLOG_BLCKSZ;
2492  nleft = nbytes;
2493  do
2494  {
2495  errno = 0;
2497  written = pg_pwrite(openLogFile, from, nleft, startoffset);
2499  if (written <= 0)
2500  {
2501  char xlogfname[MAXFNAMELEN];
2502  int save_errno;
2503 
2504  if (errno == EINTR)
2505  continue;
2506 
2507  save_errno = errno;
2510  errno = save_errno;
2511  ereport(PANIC,
2513  errmsg("could not write to log file %s "
2514  "at offset %u, length %zu: %m",
2515  xlogfname, startoffset, nleft)));
2516  }
2517  nleft -= written;
2518  from += written;
2519  startoffset += written;
2520  } while (nleft > 0);
2521 
2522  npages = 0;
2523 
2524  /*
2525  * If we just wrote the whole last page of a logfile segment,
2526  * fsync the segment immediately. This avoids having to go back
2527  * and re-open prior segments when an fsync request comes along
2528  * later. Doing it here ensures that one and only one backend will
2529  * perform this fsync.
2530  *
2531  * This is also the right place to notify the Archiver that the
2532  * segment is ready to copy to archival storage, and to update the
2533  * timer for archive_timeout, and to signal for a checkpoint if
2534  * too many logfile segments have been used since the last
2535  * checkpoint.
2536  */
2537  if (finishing_seg)
2538  {
2540 
2541  /* signal that we need to wakeup walsenders later */
2543 
2544  LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
2545 
2546  if (XLogArchivingActive())
2548 
2549  XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
2550  XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
2551 
2552  /*
2553  * Request a checkpoint if we've consumed too much xlog since
2554  * the last one. For speed, we first check using the local
2555  * copy of RedoRecPtr, which might be out of date; if it looks
2556  * like a checkpoint is needed, forcibly update RedoRecPtr and
2557  * recheck.
2558  */
2560  {
2561  (void) GetRedoRecPtr();
2564  }
2565  }
2566  }
2567 
2568  if (ispartialpage)
2569  {
2570  /* Only asked to write a partial page */
2571  LogwrtResult.Write = WriteRqst.Write;
2572  break;
2573  }
2574  curridx = NextBufIdx(curridx);
2575 
2576  /* If flexible, break out of loop as soon as we wrote something */
2577  if (flexible && npages == 0)
2578  break;
2579  }
2580 
2581  Assert(npages == 0);
2582 
2583  /*
2584  * If asked to flush, do so
2585  */
2586  if (LogwrtResult.Flush < WriteRqst.Flush &&
2587  LogwrtResult.Flush < LogwrtResult.Write)
2588 
2589  {
2590  /*
2591  * Could get here without iterating above loop, in which case we might
2592  * have no open file or the wrong one. However, we do not need to
2593  * fsync more than one file.
2594  */
2595  if (sync_method != SYNC_METHOD_OPEN &&
2597  {
2598  if (openLogFile >= 0 &&
2599  !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
2601  XLogFileClose();
2602  if (openLogFile < 0)
2603  {
2604  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2607  }
2608 
2610  }
2611 
2612  /* signal that we need to wakeup walsenders later */
2614 
2615  LogwrtResult.Flush = LogwrtResult.Write;
2616  }
2617 
2618  /*
2619  * Update shared-memory status
2620  *
2621  * We make sure that the shared 'request' values do not fall behind the
2622  * 'result' values. This is not absolutely essential, but it saves some
2623  * code in a couple of places.
2624  */
2625  {
2626  SpinLockAcquire(&XLogCtl->info_lck);
2627  XLogCtl->LogwrtResult = LogwrtResult;
2628  if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
2629  XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
2630  if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
2631  XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
2632  SpinLockRelease(&XLogCtl->info_lck);
2633  }
2634 }
2635 
2636 /*
2637  * Record the LSN for an asynchronous transaction commit/abort
2638  * and nudge the WALWriter if there is work for it to do.
2639  * (This should not be called for synchronous commits.)
2640  */
2641 void
2643 {
2644  XLogRecPtr WriteRqstPtr = asyncXactLSN;
2645  bool sleeping;
2646 
2647  SpinLockAcquire(&XLogCtl->info_lck);
2648  LogwrtResult = XLogCtl->LogwrtResult;
2649  sleeping = XLogCtl->WalWriterSleeping;
2650  if (XLogCtl->asyncXactLSN < asyncXactLSN)
2651  XLogCtl->asyncXactLSN = asyncXactLSN;
2652  SpinLockRelease(&XLogCtl->info_lck);
2653 
2654  /*
2655  * If the WALWriter is sleeping, we should kick it to make it come out of
2656  * low-power mode. Otherwise, determine whether there's a full page of
2657  * WAL available to write.
2658  */
2659  if (!sleeping)
2660  {
2661  /* back off to last completed page boundary */
2662  WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
2663 
2664  /* if we have already flushed that far, we're done */
2665  if (WriteRqstPtr <= LogwrtResult.Flush)
2666  return;
2667  }
2668 
2669  /*
2670  * Nudge the WALWriter: it has a full page of WAL to write, or we want it
2671  * to come out of low-power mode so that this async commit will reach disk
2672  * within the expected amount of time.
2673  */
2676 }
2677 
2678 /*
2679  * Record the LSN up to which we can remove WAL because it's not required by
2680  * any replication slot.
2681  */
2682 void
2684 {
2685  SpinLockAcquire(&XLogCtl->info_lck);
2686  XLogCtl->replicationSlotMinLSN = lsn;
2687  SpinLockRelease(&XLogCtl->info_lck);
2688 }
2689 
2690 
2691 /*
2692  * Return the oldest LSN we must retain to satisfy the needs of some
2693  * replication slot.
2694  */
2695 static XLogRecPtr
2697 {
2698  XLogRecPtr retval;
2699 
2700  SpinLockAcquire(&XLogCtl->info_lck);
2701  retval = XLogCtl->replicationSlotMinLSN;
2702  SpinLockRelease(&XLogCtl->info_lck);
2703 
2704  return retval;
2705 }
2706 
2707 /*
2708  * Advance minRecoveryPoint in control file.
2709  *
2710  * If we crash during recovery, we must reach this point again before the
2711  * database is consistent.
2712  *
2713  * If 'force' is true, 'lsn' argument is ignored. Otherwise, minRecoveryPoint
2714  * is only updated if it's not already greater than or equal to 'lsn'.
2715  */
2716 static void
2718 {
2719  /* Quick check using our local copy of the variable */
2720  if (!updateMinRecoveryPoint || (!force && lsn <= minRecoveryPoint))
2721  return;
2722 
2723  /*
2724  * An invalid minRecoveryPoint means that we need to recover all the WAL,
2725  * i.e., we're doing crash recovery. We never modify the control file's
2726  * value in that case, so we can short-circuit future checks here too. The
2727  * local values of minRecoveryPoint and minRecoveryPointTLI should not be
2728  * updated until crash recovery finishes. We only do this for the startup
2729  * process as it should not update its own reference of minRecoveryPoint
2730  * until it has finished crash recovery to make sure that all WAL
2731  * available is replayed in this case. This also saves from extra locks
2732  * taken on the control file from the startup process.
2733  */
2735  {
2736  updateMinRecoveryPoint = false;
2737  return;
2738  }
2739 
2740  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2741 
2742  /* update local copy */
2743  minRecoveryPoint = ControlFile->minRecoveryPoint;
2745 
2747  updateMinRecoveryPoint = false;
2748  else if (force || minRecoveryPoint < lsn)
2749  {
2750  XLogRecPtr newMinRecoveryPoint;
2751  TimeLineID newMinRecoveryPointTLI;
2752 
2753  /*
2754  * To avoid having to update the control file too often, we update it
2755  * all the way to the last record being replayed, even though 'lsn'
2756  * would suffice for correctness. This also allows the 'force' case
2757  * to not need a valid 'lsn' value.
2758  *
2759  * Another important reason for doing it this way is that the passed
2760  * 'lsn' value could be bogus, i.e., past the end of available WAL, if
2761  * the caller got it from a corrupted heap page. Accepting such a
2762  * value as the min recovery point would prevent us from coming up at
2763  * all. Instead, we just log a warning and continue with recovery.
2764  * (See also the comments about corrupt LSNs in XLogFlush.)
2765  */
2766  SpinLockAcquire(&XLogCtl->info_lck);
2767  newMinRecoveryPoint = XLogCtl->replayEndRecPtr;
2768  newMinRecoveryPointTLI = XLogCtl->replayEndTLI;
2769  SpinLockRelease(&XLogCtl->info_lck);
2770 
2771  if (!force && newMinRecoveryPoint < lsn)
2772  elog(WARNING,
2773  "xlog min recovery request %X/%X is past current point %X/%X",
2774  (uint32) (lsn >> 32), (uint32) lsn,
2775  (uint32) (newMinRecoveryPoint >> 32),
2776  (uint32) newMinRecoveryPoint);
2777 
2778  /* update control file */
2779  if (ControlFile->minRecoveryPoint < newMinRecoveryPoint)
2780  {
2781  ControlFile->minRecoveryPoint = newMinRecoveryPoint;
2782  ControlFile->minRecoveryPointTLI = newMinRecoveryPointTLI;
2784  minRecoveryPoint = newMinRecoveryPoint;
2785  minRecoveryPointTLI = newMinRecoveryPointTLI;
2786 
2787  ereport(DEBUG2,
2788  (errmsg("updated min recovery point to %X/%X on timeline %u",
2789  (uint32) (minRecoveryPoint >> 32),
2791  newMinRecoveryPointTLI)));
2792  }
2793  }
2794  LWLockRelease(ControlFileLock);
2795 }
2796 
2797 /*
2798  * Ensure that all XLOG data through the given position is flushed to disk.
2799  *
2800  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
2801  * already held, and we try to avoid acquiring it if possible.
2802  */
2803 void
2805 {
2806  XLogRecPtr WriteRqstPtr;
2807  XLogwrtRqst WriteRqst;
2808 
2809  /*
2810  * During REDO, we are reading not writing WAL. Therefore, instead of
2811  * trying to flush the WAL, we should update minRecoveryPoint instead. We
2812  * test XLogInsertAllowed(), not InRecovery, because we need checkpointer
2813  * to act this way too, and because when it tries to write the
2814  * end-of-recovery checkpoint, it should indeed flush.
2815  */
2816  if (!XLogInsertAllowed())
2817  {
2818  UpdateMinRecoveryPoint(record, false);
2819  return;
2820  }
2821 
2822  /* Quick exit if already known flushed */
2823  if (record <= LogwrtResult.Flush)
2824  return;
2825 
2826 #ifdef WAL_DEBUG
2827  if (XLOG_DEBUG)
2828  elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
2829  (uint32) (record >> 32), (uint32) record,
2830  (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
2831  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
2832 #endif
2833 
2835 
2836  /*
2837  * Since fsync is usually a horribly expensive operation, we try to
2838  * piggyback as much data as we can on each fsync: if we see any more data
2839  * entered into the xlog buffer, we'll write and fsync that too, so that
2840  * the final value of LogwrtResult.Flush is as large as possible. This
2841  * gives us some chance of avoiding another fsync immediately after.
2842  */
2843 
2844  /* initialize to given target; may increase below */
2845  WriteRqstPtr = record;
2846 
2847  /*
2848  * Now wait until we get the write lock, or someone else does the flush
2849  * for us.
2850  */
2851  for (;;)
2852  {
2853  XLogRecPtr insertpos;
2854 
2855  /* read LogwrtResult and update local state */
2856  SpinLockAcquire(&XLogCtl->info_lck);
2857  if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
2858  WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
2859  LogwrtResult = XLogCtl->LogwrtResult;
2860  SpinLockRelease(&XLogCtl->info_lck);
2861 
2862  /* done already? */
2863  if (record <= LogwrtResult.Flush)
2864  break;
2865 
2866  /*
2867  * Before actually performing the write, wait for all in-flight
2868  * insertions to the pages we're about to write to finish.
2869  */
2870  insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
2871 
2872  /*
2873  * Try to get the write lock. If we can't get it immediately, wait
2874  * until it's released, and recheck if we still need to do the flush
2875  * or if the backend that held the lock did it for us already. This
2876  * helps to maintain a good rate of group committing when the system
2877  * is bottlenecked by the speed of fsyncing.
2878  */
2879  if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
2880  {
2881  /*
2882  * The lock is now free, but we didn't acquire it yet. Before we
2883  * do, loop back to check if someone else flushed the record for
2884  * us already.
2885  */
2886  continue;
2887  }
2888 
2889  /* Got the lock; recheck whether request is satisfied */
2890  LogwrtResult = XLogCtl->LogwrtResult;
2891  if (record <= LogwrtResult.Flush)
2892  {
2893  LWLockRelease(WALWriteLock);
2894  break;
2895  }
2896 
2897  /*
2898  * Sleep before flush! By adding a delay here, we may give further
2899  * backends the opportunity to join the backlog of group commit
2900  * followers; this can significantly improve transaction throughput,
2901  * at the risk of increasing transaction latency.
2902  *
2903  * We do not sleep if enableFsync is not turned on, nor if there are
2904  * fewer than CommitSiblings other backends with active transactions.
2905  */
2906  if (CommitDelay > 0 && enableFsync &&
2908  {
2910 
2911  /*
2912  * Re-check how far we can now flush the WAL. It's generally not
2913  * safe to call WaitXLogInsertionsToFinish while holding
2914  * WALWriteLock, because an in-progress insertion might need to
2915  * also grab WALWriteLock to make progress. But we know that all
2916  * the insertions up to insertpos have already finished, because
2917  * that's what the earlier WaitXLogInsertionsToFinish() returned.
2918  * We're only calling it again to allow insertpos to be moved
2919  * further forward, not to actually wait for anyone.
2920  */
2921  insertpos = WaitXLogInsertionsToFinish(insertpos);
2922  }
2923 
2924  /* try to write/flush later additions to XLOG as well */
2925  WriteRqst.Write = insertpos;
2926  WriteRqst.Flush = insertpos;
2927 
2928  XLogWrite(WriteRqst, false);
2929 
2930  LWLockRelease(WALWriteLock);
2931  /* done */
2932  break;
2933  }
2934 
2935  END_CRIT_SECTION();
2936 
2937  /* wake up walsenders now that we've released heavily contended locks */
2939 
2940  /*
2941  * If we still haven't flushed to the request point then we have a
2942  * problem; most likely, the requested flush point is past end of XLOG.
2943  * This has been seen to occur when a disk page has a corrupted LSN.
2944  *
2945  * Formerly we treated this as a PANIC condition, but that hurts the
2946  * system's robustness rather than helping it: we do not want to take down
2947  * the whole system due to corruption on one data page. In particular, if
2948  * the bad page is encountered again during recovery then we would be
2949  * unable to restart the database at all! (This scenario actually
2950  * happened in the field several times with 7.1 releases.) As of 8.4, bad
2951  * LSNs encountered during recovery are UpdateMinRecoveryPoint's problem;
2952  * the only time we can reach here during recovery is while flushing the
2953  * end-of-recovery checkpoint record, and we don't expect that to have a
2954  * bad LSN.
2955  *
2956  * Note that for calls from xact.c, the ERROR will be promoted to PANIC
2957  * since xact.c calls this routine inside a critical section. However,
2958  * calls from bufmgr.c are not within critical sections and so we will not
2959  * force a restart for a bad LSN on a data page.
2960  */
2961  if (LogwrtResult.Flush < record)
2962  elog(ERROR,
2963  "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
2964  (uint32) (record >> 32), (uint32) record,
2965  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
2966 }
2967 
2968 /*
2969  * Write & flush xlog, but without specifying exactly where to.
2970  *
2971  * We normally write only completed blocks; but if there is nothing to do on
2972  * that basis, we check for unwritten async commits in the current incomplete
2973  * block, and write through the latest one of those. Thus, if async commits
2974  * are not being used, we will write complete blocks only.
2975  *
2976  * If, based on the above, there's anything to write we do so immediately. But
2977  * to avoid calling fsync, fdatasync et. al. at a rate that'd impact
2978  * concurrent IO, we only flush WAL every wal_writer_delay ms, or if there's
2979  * more than wal_writer_flush_after unflushed blocks.
2980  *
2981  * We can guarantee that async commits reach disk after at most three
2982  * wal_writer_delay cycles. (When flushing complete blocks, we allow XLogWrite
2983  * to write "flexibly", meaning it can stop at the end of the buffer ring;
2984  * this makes a difference only with very high load or long wal_writer_delay,
2985  * but imposes one extra cycle for the worst case for async commits.)
2986  *
2987  * This routine is invoked periodically by the background walwriter process.
2988  *
2989  * Returns true if there was any work to do, even if we skipped flushing due
2990  * to wal_writer_delay/wal_writer_flush_after.
2991  */
2992 bool
2994 {
2995  XLogwrtRqst WriteRqst;
2996  bool flexible = true;
2997  static TimestampTz lastflush;
2998  TimestampTz now;
2999  int flushbytes;
3000 
3001  /* XLOG doesn't need flushing during recovery */
3002  if (RecoveryInProgress())
3003  return false;
3004 
3005  /* read LogwrtResult and update local state */
3006  SpinLockAcquire(&XLogCtl->info_lck);
3007  LogwrtResult = XLogCtl->LogwrtResult;
3008  WriteRqst = XLogCtl->LogwrtRqst;
3009  SpinLockRelease(&XLogCtl->info_lck);
3010 
3011  /* back off to last completed page boundary */
3012  WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
3013 
3014  /* if we have already flushed that far, consider async commit records */
3015  if (WriteRqst.Write <= LogwrtResult.Flush)
3016  {
3017  SpinLockAcquire(&XLogCtl->info_lck);
3018  WriteRqst.Write = XLogCtl->asyncXactLSN;
3019  SpinLockRelease(&XLogCtl->info_lck);
3020  flexible = false; /* ensure it all gets written */
3021  }
3022 
3023  /*
3024  * If already known flushed, we're done. Just need to check if we are
3025  * holding an open file handle to a logfile that's no longer in use,
3026  * preventing the file from being deleted.
3027  */
3028  if (WriteRqst.Write <= LogwrtResult.Flush)
3029  {
3030  if (openLogFile >= 0)
3031  {
3032  if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
3034  {
3035  XLogFileClose();
3036  }
3037  }
3038  return false;
3039  }
3040 
3041  /*
3042  * Determine how far to flush WAL, based on the wal_writer_delay and
3043  * wal_writer_flush_after GUCs.
3044  */
3045  now = GetCurrentTimestamp();
3046  flushbytes =
3047  WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
3048 
3049  if (WalWriterFlushAfter == 0 || lastflush == 0)
3050  {
3051  /* first call, or block based limits disabled */
3052  WriteRqst.Flush = WriteRqst.Write;
3053  lastflush = now;
3054  }
3055  else if (TimestampDifferenceExceeds(lastflush, now, WalWriterDelay))
3056  {
3057  /*
3058  * Flush the writes at least every WalWriterDelay ms. This is
3059  * important to bound the amount of time it takes for an asynchronous
3060  * commit to hit disk.
3061  */
3062  WriteRqst.Flush = WriteRqst.Write;
3063  lastflush = now;
3064  }
3065  else if (flushbytes >= WalWriterFlushAfter)
3066  {
3067  /* exceeded wal_writer_flush_after blocks, flush */
3068  WriteRqst.Flush = WriteRqst.Write;
3069  lastflush = now;
3070  }
3071  else
3072  {
3073  /* no flushing, this time round */
3074  WriteRqst.Flush = 0;
3075  }
3076 
3077 #ifdef WAL_DEBUG
3078  if (XLOG_DEBUG)
3079  elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X",
3080  (uint32) (WriteRqst.Write >> 32), (uint32) WriteRqst.Write,
3081  (uint32) (WriteRqst.Flush >> 32), (uint32) WriteRqst.Flush,
3082  (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
3083  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
3084 #endif
3085 
3087 
3088  /* now wait for any in-progress insertions to finish and get write lock */
3089  WaitXLogInsertionsToFinish(WriteRqst.Write);
3090  LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
3091  LogwrtResult = XLogCtl->LogwrtResult;
3092  if (WriteRqst.Write > LogwrtResult.Write ||
3093  WriteRqst.Flush > LogwrtResult.Flush)
3094  {
3095  XLogWrite(WriteRqst, flexible);
3096  }
3097  LWLockRelease(WALWriteLock);
3098 
3099  END_CRIT_SECTION();
3100 
3101  /* wake up walsenders now that we've released heavily contended locks */
3103 
3104  /*
3105  * Great, done. To take some work off the critical path, try to initialize
3106  * as many of the no-longer-needed WAL buffers for future use as we can.
3107  */
3109 
3110  /*
3111  * If we determined that we need to write data, but somebody else
3112  * wrote/flushed already, it should be considered as being active, to
3113  * avoid hibernating too early.
3114  */
3115  return true;
3116 }
3117 
3118 /*
3119  * Test whether XLOG data has been flushed up to (at least) the given position.
3120  *
3121  * Returns true if a flush is still needed. (It may be that someone else
3122  * is already in process of flushing that far, however.)
3123  */
3124 bool
3126 {
3127  /*
3128  * During recovery, we don't flush WAL but update minRecoveryPoint
3129  * instead. So "needs flush" is taken to mean whether minRecoveryPoint
3130  * would need to be updated.
3131  */
3132  if (RecoveryInProgress())
3133  {
3134  /*
3135  * An invalid minRecoveryPoint means that we need to recover all the
3136  * WAL, i.e., we're doing crash recovery. We never modify the control
3137  * file's value in that case, so we can short-circuit future checks
3138  * here too. This triggers a quick exit path for the startup process,
3139  * which cannot update its local copy of minRecoveryPoint as long as
3140  * it has not replayed all WAL available when doing crash recovery.
3141  */
3143  updateMinRecoveryPoint = false;
3144 
3145  /* Quick exit if already known to be updated or cannot be updated */
3146  if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
3147  return false;
3148 
3149  /*
3150  * Update local copy of minRecoveryPoint. But if the lock is busy,
3151  * just return a conservative guess.
3152  */
3153  if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
3154  return true;
3155  minRecoveryPoint = ControlFile->minRecoveryPoint;
3157  LWLockRelease(ControlFileLock);
3158 
3159  /*
3160  * Check minRecoveryPoint for any other process than the startup
3161  * process doing crash recovery, which should not update the control
3162  * file value if crash recovery is still running.
3163  */
3165  updateMinRecoveryPoint = false;
3166 
3167  /* check again */
3168  if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
3169  return false;
3170  else
3171  return true;
3172  }
3173 
3174  /* Quick exit if already known flushed */
3175  if (record <= LogwrtResult.Flush)
3176  return false;
3177 
3178  /* read LogwrtResult and update local state */
3179  SpinLockAcquire(&XLogCtl->info_lck);
3180  LogwrtResult = XLogCtl->LogwrtResult;
3181  SpinLockRelease(&XLogCtl->info_lck);
3182 
3183  /* check again */
3184  if (record <= LogwrtResult.Flush)
3185  return false;
3186 
3187  return true;
3188 }
3189 
3190 /*
3191  * Create a new XLOG file segment, or open a pre-existing one.
3192  *
3193  * logsegno: identify segment to be created/opened.
3194  *
3195  * *use_existent: if true, OK to use a pre-existing file (else, any
3196  * pre-existing file will be deleted). On return, true if a pre-existing
3197  * file was used.
3198  *
3199  * use_lock: if true, acquire ControlFileLock while moving file into
3200  * place. This should be true except during bootstrap log creation. The
3201  * caller must *not* hold the lock at call.
3202  *
3203  * Returns FD of opened file.
3204  *
3205  * Note: errors here are ERROR not PANIC because we might or might not be
3206  * inside a critical section (eg, during checkpoint there is no reason to
3207  * take down the system on failure). They will promote to PANIC if we are
3208  * in a critical section.
3209  */
3210 int
3211 XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
3212 {
3213  char path[MAXPGPATH];
3214  char tmppath[MAXPGPATH];
3215  PGAlignedXLogBlock zbuffer;
3216  XLogSegNo installed_segno;
3217  XLogSegNo max_segno;
3218  int fd;
3219  int nbytes;
3220  int save_errno;
3221 
3222  XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
3223 
3224  /*
3225  * Try to use existent file (checkpoint maker may have created it already)
3226  */
3227  if (*use_existent)
3228  {
3229  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3230  if (fd < 0)
3231  {
3232  if (errno != ENOENT)
3233  ereport(ERROR,
3235  errmsg("could not open file \"%s\": %m", path)));
3236  }
3237  else
3238  return fd;
3239  }
3240 
3241  /*
3242  * Initialize an empty (all zeroes) segment. NOTE: it is possible that
3243  * another process is doing the same thing. If so, we will end up
3244  * pre-creating an extra log segment. That seems OK, and better than
3245  * holding the lock throughout this lengthy process.
3246  */
3247  elog(DEBUG2, "creating and filling new WAL file");
3248 
3249  snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3250 
3251  unlink(tmppath);
3252 
3253  /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3254  fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
3255  if (fd < 0)
3256  ereport(ERROR,
3258  errmsg("could not create file \"%s\": %m", tmppath)));
3259 
3260  memset(zbuffer.data, 0, XLOG_BLCKSZ);
3261 
3263  save_errno = 0;
3264  if (wal_init_zero)
3265  {
3266  /*
3267  * Zero-fill the file. With this setting, we do this the hard way to
3268  * ensure that all the file space has really been allocated. On
3269  * platforms that allow "holes" in files, just seeking to the end
3270  * doesn't allocate intermediate space. This way, we know that we
3271  * have all the space and (after the fsync below) that all the
3272  * indirect blocks are down on disk. Therefore, fdatasync(2) or
3273  * O_DSYNC will be sufficient to sync future writes to the log file.
3274  */
3275  for (nbytes = 0; nbytes < wal_segment_size; nbytes += XLOG_BLCKSZ)
3276  {
3277  errno = 0;
3278  if (write(fd, zbuffer.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3279  {
3280  /* if write didn't set errno, assume no disk space */
3281  save_errno = errno ? errno : ENOSPC;
3282  break;
3283  }
3284  }
3285  }
3286  else
3287  {
3288  /*
3289  * Otherwise, seeking to the end and writing a solitary byte is
3290  * enough.
3291  */
3292  errno = 0;
3293  if (pg_pwrite(fd, zbuffer.data, 1, wal_segment_size - 1) != 1)
3294  {
3295  /* if write didn't set errno, assume no disk space */
3296  save_errno = errno ? errno : ENOSPC;
3297  }
3298  }
3300 
3301  if (save_errno)
3302  {
3303  /*
3304  * If we fail to make the file, delete it to release disk space
3305  */
3306  unlink(tmppath);
3307 
3308  close(fd);
3309 
3310  errno = save_errno;
3311 
3312  ereport(ERROR,
3314  errmsg("could not write to file \"%s\": %m", tmppath)));
3315  }
3316 
3318  if (pg_fsync(fd) != 0)
3319  {
3320  int save_errno = errno;
3321 
3322  close(fd);
3323  errno = save_errno;
3324  ereport(ERROR,
3326  errmsg("could not fsync file \"%s\": %m", tmppath)));
3327  }
3329 
3330  if (close(fd) != 0)
3331  ereport(ERROR,
3333  errmsg("could not close file \"%s\": %m", tmppath)));
3334 
3335  /*
3336  * Now move the segment into place with its final name.
3337  *
3338  * If caller didn't want to use a pre-existing file, get rid of any
3339  * pre-existing file. Otherwise, cope with possibility that someone else
3340  * has created the file while we were filling ours: if so, use ours to
3341  * pre-create a future log segment.
3342  */
3343  installed_segno = logsegno;
3344 
3345  /*
3346  * XXX: What should we use as max_segno? We used to use XLOGfileslop when
3347  * that was a constant, but that was always a bit dubious: normally, at a
3348  * checkpoint, XLOGfileslop was the offset from the checkpoint record, but
3349  * here, it was the offset from the insert location. We can't do the
3350  * normal XLOGfileslop calculation here because we don't have access to
3351  * the prior checkpoint's redo location. So somewhat arbitrarily, just use
3352  * CheckPointSegments.
3353  */
3354  max_segno = logsegno + CheckPointSegments;
3355  if (!InstallXLogFileSegment(&installed_segno, tmppath,
3356  *use_existent, max_segno,
3357  use_lock))
3358  {
3359  /*
3360  * No need for any more future segments, or InstallXLogFileSegment()
3361  * failed to rename the file into place. If the rename failed, opening
3362  * the file below will fail.
3363  */
3364  unlink(tmppath);
3365  }
3366 
3367  /* Set flag to tell caller there was no existent file */
3368  *use_existent = false;
3369 
3370  /* Now open original target segment (might not be file I just made) */
3371  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3372  if (fd < 0)
3373  ereport(ERROR,
3375  errmsg("could not open file \"%s\": %m", path)));
3376 
3377  elog(DEBUG2, "done creating and filling new WAL file");
3378 
3379  return fd;
3380 }
3381 
3382 /*
3383  * Create a new XLOG file segment by copying a pre-existing one.
3384  *
3385  * destsegno: identify segment to be created.
3386  *
3387  * srcTLI, srcsegno: identify segment to be copied (could be from
3388  * a different timeline)
3389  *
3390  * upto: how much of the source file to copy (the rest is filled with
3391  * zeros)
3392  *
3393  * Currently this is only used during recovery, and so there are no locking
3394  * considerations. But we should be just as tense as XLogFileInit to avoid
3395  * emplacing a bogus file.
3396  */
3397 static void
3398 XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
3399  int upto)
3400 {
3401  char path[MAXPGPATH];
3402  char tmppath[MAXPGPATH];
3403  PGAlignedXLogBlock buffer;
3404  int srcfd;
3405  int fd;
3406  int nbytes;
3407 
3408  /*
3409  * Open the source file
3410  */
3411  XLogFilePath(path, srcTLI, srcsegno, wal_segment_size);
3412  srcfd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3413  if (srcfd < 0)
3414  ereport(ERROR,
3416  errmsg("could not open file \"%s\": %m", path)));
3417 
3418  /*
3419  * Copy into a temp file name.
3420  */
3421  snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3422 
3423  unlink(tmppath);
3424 
3425  /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3426  fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
3427  if (fd < 0)
3428  ereport(ERROR,
3430  errmsg("could not create file \"%s\": %m", tmppath)));
3431 
3432  /*
3433  * Do the data copying.
3434  */
3435  for (nbytes = 0; nbytes < wal_segment_size; nbytes += sizeof(buffer))
3436  {
3437  int nread;
3438 
3439  nread = upto - nbytes;
3440 
3441  /*
3442  * The part that is not read from the source file is filled with
3443  * zeros.
3444  */
3445  if (nread < sizeof(buffer))
3446  memset(buffer.data, 0, sizeof(buffer));
3447 
3448  if (nread > 0)
3449  {
3450  int r;
3451 
3452  if (nread > sizeof(buffer))
3453  nread = sizeof(buffer);
3455  r = read(srcfd, buffer.data, nread);
3456  if (r != nread)
3457  {
3458  if (r < 0)
3459  ereport(ERROR,
3461  errmsg("could not read file \"%s\": %m",
3462  path)));
3463  else
3464  ereport(ERROR,
3466  errmsg("could not read file \"%s\": read %d of %zu",
3467  path, r, (Size) nread)));
3468  }
3470  }
3471  errno = 0;
3473  if ((int) write(fd, buffer.data, sizeof(buffer)) != (int) sizeof(buffer))
3474  {
3475  int save_errno = errno;
3476 
3477  /*
3478  * If we fail to make the file, delete it to release disk space
3479  */
3480  unlink(tmppath);
3481  /* if write didn't set errno, assume problem is no disk space */
3482  errno = save_errno ? save_errno : ENOSPC;
3483 
3484  ereport(ERROR,
3486  errmsg("could not write to file \"%s\": %m", tmppath)));
3487  }
3489  }
3490 
3492  if (pg_fsync(fd) != 0)
3495  errmsg("could not fsync file \"%s\": %m", tmppath)));
3497 
3498  if (CloseTransientFile(fd) != 0)
3499  ereport(ERROR,
3501  errmsg("could not close file \"%s\": %m", tmppath)));
3502 
3503  if (CloseTransientFile(srcfd) != 0)
3504  ereport(ERROR,
3506  errmsg("could not close file \"%s\": %m", path)));
3507 
3508  /*
3509  * Now move the segment into place with its final name.
3510  */
3511  if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, false))
3512  elog(ERROR, "InstallXLogFileSegment should not have failed");
3513 }
3514 
3515 /*
3516  * Install a new XLOG segment file as a current or future log segment.
3517  *
3518  * This is used both to install a newly-created segment (which has a temp
3519  * filename while it's being created) and to recycle an old segment.
3520  *
3521  * *segno: identify segment to install as (or first possible target).
3522  * When find_free is true, this is modified on return to indicate the
3523  * actual installation location or last segment searched.
3524  *
3525  * tmppath: initial name of file to install. It will be renamed into place.
3526  *
3527  * find_free: if true, install the new segment at the first empty segno
3528  * number at or after the passed numbers. If false, install the new segment
3529  * exactly where specified, deleting any existing segment file there.
3530  *
3531  * max_segno: maximum segment number to install the new file as. Fail if no
3532  * free slot is found between *segno and max_segno. (Ignored when find_free
3533  * is false.)
3534  *
3535  * use_lock: if true, acquire ControlFileLock while moving file into
3536  * place. This should be true except during bootstrap log creation. The
3537  * caller must *not* hold the lock at call.
3538  *
3539  * Returns true if the file was installed successfully. false indicates that
3540  * max_segno limit was exceeded, or an error occurred while renaming the
3541  * file into place.
3542  */
3543 static bool
3544 InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
3545  bool find_free, XLogSegNo max_segno,
3546  bool use_lock)
3547 {
3548  char path[MAXPGPATH];
3549  struct stat stat_buf;
3550 
3552 
3553  /*
3554  * We want to be sure that only one process does this at a time.
3555  */
3556  if (use_lock)
3557  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
3558 
3559  if (!find_free)
3560  {
3561  /* Force installation: get rid of any pre-existing segment file */
3562  durable_unlink(path, DEBUG1);
3563  }
3564  else
3565  {
3566  /* Find a free slot to put it in */
3567  while (stat(path, &stat_buf) == 0)
3568  {
3569  if ((*segno) >= max_segno)
3570  {
3571  /* Failed to find a free slot within specified range */
3572  if (use_lock)
3573  LWLockRelease(ControlFileLock);
3574  return false;
3575  }
3576  (*segno)++;
3578  }
3579  }
3580 
3581  /*
3582  * Perform the rename using link if available, paranoidly trying to avoid
3583  * overwriting an existing file (there shouldn't be one).
3584  */
3585  if (durable_link_or_rename(tmppath, path, LOG) != 0)
3586  {
3587  if (use_lock)
3588  LWLockRelease(ControlFileLock);
3589  /* durable_link_or_rename already emitted log message */
3590  return false;
3591  }
3592 
3593  if (use_lock)
3594  LWLockRelease(ControlFileLock);
3595 
3596  return true;
3597 }
3598 
3599 /*
3600  * Open a pre-existing logfile segment for writing.
3601  */
3602 int
3604 {
3605  char path[MAXPGPATH];
3606  int fd;
3607 
3609 
3610  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3611  if (fd < 0)
3612  ereport(PANIC,
3614  errmsg("could not open file \"%s\": %m", path)));
3615 
3616  return fd;
3617 }
3618 
3619 /*
3620  * Open a logfile segment for reading (during recovery).
3621  *
3622  * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
3623  * Otherwise, it's assumed to be already available in pg_wal.
3624  */
3625 static int
3626 XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
3627  int source, bool notfoundOk)
3628 {
3629  char xlogfname[MAXFNAMELEN];
3630  char activitymsg[MAXFNAMELEN + 16];
3631  char path[MAXPGPATH];
3632  int fd;
3633 
3634  XLogFileName(xlogfname, tli, segno, wal_segment_size);
3635 
3636  switch (source)
3637  {
3638  case XLOG_FROM_ARCHIVE:
3639  /* Report recovery progress in PS display */
3640  snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
3641  xlogfname);
3642  set_ps_display(activitymsg, false);
3643 
3644  restoredFromArchive = RestoreArchivedFile(path, xlogfname,
3645  "RECOVERYXLOG",
3647  InRedo);
3648  if (!restoredFromArchive)
3649  return -1;
3650  break;
3651 
3652  case XLOG_FROM_PG_WAL:
3653  case XLOG_FROM_STREAM:
3654  XLogFilePath(path, tli, segno, wal_segment_size);
3655  restoredFromArchive = false;
3656  break;
3657 
3658  default:
3659  elog(ERROR, "invalid XLogFileRead source %d", source);
3660  }
3661 
3662  /*
3663  * If the segment was fetched from archival storage, replace the existing
3664  * xlog segment (if any) with the archival version.
3665  */
3666  if (source == XLOG_FROM_ARCHIVE)
3667  {
3668  KeepFileRestoredFromArchive(path, xlogfname);
3669 
3670  /*
3671  * Set path to point at the new file in pg_wal.
3672  */
3673  snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
3674  }
3675 
3676  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3677  if (fd >= 0)
3678  {
3679  /* Success! */
3680  curFileTLI = tli;
3681 
3682  /* Report recovery progress in PS display */
3683  snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
3684  xlogfname);
3685  set_ps_display(activitymsg, false);
3686 
3687  /* Track source of data in assorted state variables */
3688  readSource = source;
3689  XLogReceiptSource = source;
3690  /* In FROM_STREAM case, caller tracks receipt time, not me */
3691  if (source != XLOG_FROM_STREAM)
3693 
3694  return fd;
3695  }
3696  if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
3697  ereport(PANIC,
3699  errmsg("could not open file \"%s\": %m", path)));
3700  return -1;
3701 }
3702 
3703 /*
3704  * Open a logfile segment for reading (during recovery).
3705  *
3706  * This version searches for the segment with any TLI listed in expectedTLEs.
3707  */
3708 static int
3709 XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
3710 {
3711  char path[MAXPGPATH];
3712  ListCell *cell;
3713  int fd;
3714  List *tles;
3715 
3716  /*
3717  * Loop looking for a suitable timeline ID: we might need to read any of
3718  * the timelines listed in expectedTLEs.
3719  *
3720  * We expect curFileTLI on entry to be the TLI of the preceding file in
3721  * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
3722  * to go backwards; this prevents us from picking up the wrong file when a
3723  * parent timeline extends to higher segment numbers than the child we
3724  * want to read.
3725  *
3726  * If we haven't read the timeline history file yet, read it now, so that
3727  * we know which TLIs to scan. We don't save the list in expectedTLEs,
3728  * however, unless we actually find a valid segment. That way if there is
3729  * neither a timeline history file nor a WAL segment in the archive, and
3730  * streaming replication is set up, we'll read the timeline history file
3731  * streamed from the master when we start streaming, instead of recovering
3732  * with a dummy history generated here.
3733  */
3734  if (expectedTLEs)
3735  tles = expectedTLEs;
3736  else
3738 
3739  foreach(cell, tles)
3740  {
3741  TimeLineID tli = ((TimeLineHistoryEntry *) lfirst(cell))->tli;
3742 
3743  if (tli < curFileTLI)
3744  break; /* don't bother looking at too-old TLIs */
3745 
3746  if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
3747  {
3748  fd = XLogFileRead(segno, emode, tli,
3749  XLOG_FROM_ARCHIVE, true);
3750  if (fd != -1)
3751  {
3752  elog(DEBUG1, "got WAL segment from archive");
3753  if (!expectedTLEs)
3754  expectedTLEs = tles;
3755  return fd;
3756  }
3757  }
3758 
3759  if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_WAL)
3760  {
3761  fd = XLogFileRead(segno, emode, tli,
3762  XLOG_FROM_PG_WAL, true);
3763  if (fd != -1)
3764  {
3765  if (!expectedTLEs)
3766  expectedTLEs = tles;
3767  return fd;
3768  }
3769  }
3770  }
3771 
3772  /* Couldn't find it. For simplicity, complain about front timeline */
3774  errno = ENOENT;
3775  ereport(emode,
3777  errmsg("could not open file \"%s\": %m", path)));
3778  return -1;
3779 }
3780 
3781 /*
3782  * Close the current logfile segment for writing.
3783  */
3784 static void
3786 {
3787  Assert(openLogFile >= 0);
3788 
3789  /*
3790  * WAL segment files will not be re-read in normal operation, so we advise
3791  * the OS to release any cached pages. But do not do so if WAL archiving
3792  * or streaming is active, because archiver and walsender process could
3793  * use the cache to read the WAL segment.
3794  */
3795 #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
3796  if (!XLogIsNeeded())
3797  (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
3798 #endif
3799 
3800  if (close(openLogFile) != 0)
3801  {
3802  char xlogfname[MAXFNAMELEN];
3803  int save_errno = errno;
3804 
3806  errno = save_errno;
3807  ereport(PANIC,
3809  errmsg("could not close file \"%s\": %m", xlogfname)));
3810  }
3811 
3812  openLogFile = -1;
3813 }
3814 
3815 /*
3816  * Preallocate log files beyond the specified log endpoint.
3817  *
3818  * XXX this is currently extremely conservative, since it forces only one
3819  * future log segment to exist, and even that only if we are 75% done with
3820  * the current one. This is only appropriate for very low-WAL-volume systems.
3821  * High-volume systems will be OK once they've built up a sufficient set of
3822  * recycled log segments, but the startup transient is likely to include
3823  * a lot of segment creations by foreground processes, which is not so good.
3824  */
3825 static void
3827 {
3828  XLogSegNo _logSegNo;
3829  int lf;
3830  bool use_existent;
3831  uint64 offset;
3832 
3833  XLByteToPrevSeg(endptr, _logSegNo, wal_segment_size);
3834  offset = XLogSegmentOffset(endptr - 1, wal_segment_size);
3835  if (offset >= (uint32) (0.75 * wal_segment_size))
3836  {
3837  _logSegNo++;
3838  use_existent = true;
3839  lf = XLogFileInit(_logSegNo, &use_existent, true);
3840  close(lf);
3841  if (!use_existent)
3842  CheckpointStats.ckpt_segs_added++;
3843  }
3844 }
3845 
3846 /*
3847  * Throws an error if the given log segment has already been removed or
3848  * recycled. The caller should only pass a segment that it knows to have
3849  * existed while the server has been running, as this function always
3850  * succeeds if no WAL segments have been removed since startup.
3851  * 'tli' is only used in the error message.
3852  *
3853  * Note: this function guarantees to keep errno unchanged on return.
3854  * This supports callers that use this to possibly deliver a better
3855  * error message about a missing file, while still being able to throw
3856  * a normal file-access error afterwards, if this does return.
3857  */
3858 void
3860 {
3861  int save_errno = errno;
3862  XLogSegNo lastRemovedSegNo;
3863 
3864  SpinLockAcquire(&XLogCtl->info_lck);
3865  lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
3866  SpinLockRelease(&XLogCtl->info_lck);
3867 
3868  if (segno <= lastRemovedSegNo)
3869  {
3870  char filename[MAXFNAMELEN];
3871 
3872  XLogFileName(filename, tli, segno, wal_segment_size);
3873  errno = save_errno;
3874  ereport(ERROR,
3876  errmsg("requested WAL segment %s has already been removed",
3877  filename)));
3878  }
3879  errno = save_errno;
3880 }
3881 
3882 /*
3883  * Return the last WAL segment removed, or 0 if no segment has been removed
3884  * since startup.
3885  *
3886  * NB: the result can be out of date arbitrarily fast, the caller has to deal
3887  * with that.
3888  */
3889 XLogSegNo
3891 {
3892  XLogSegNo lastRemovedSegNo;
3893 
3894  SpinLockAcquire(&XLogCtl->info_lck);
3895  lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
3896  SpinLockRelease(&XLogCtl->info_lck);
3897 
3898  return lastRemovedSegNo;
3899 }
3900 
3901 /*
3902  * Update the last removed segno pointer in shared memory, to reflect
3903  * that the given XLOG file has been removed.
3904  */
3905 static void
3907 {
3908  uint32 tli;
3909  XLogSegNo segno;
3910 
3911  XLogFromFileName(filename, &tli, &segno, wal_segment_size);
3912 
3913  SpinLockAcquire(&XLogCtl->info_lck);
3914  if (segno > XLogCtl->lastRemovedSegNo)
3915  XLogCtl->lastRemovedSegNo = segno;
3916  SpinLockRelease(&XLogCtl->info_lck);
3917 }
3918 
3919 /*
3920  * Remove all temporary log files in pg_wal
3921  *
3922  * This is called at the beginning of recovery after a previous crash,
3923  * at a point where no other processes write fresh WAL data.
3924  */
3925 static void
3927 {
3928  DIR *xldir;
3929  struct dirent *xlde;
3930 
3931  elog(DEBUG2, "removing all temporary WAL segments");
3932 
3933  xldir = AllocateDir(XLOGDIR);
3934  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3935  {
3936  char path[MAXPGPATH];
3937 
3938  if (strncmp(xlde->d_name, "xlogtemp.", 9) != 0)
3939  continue;
3940 
3941  snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
3942  unlink(path);
3943  elog(DEBUG2, "removed temporary WAL segment \"%s\"", path);
3944  }
3945  FreeDir(xldir);
3946 }
3947 
3948 /*
3949  * Recycle or remove all log files older or equal to passed segno.
3950  *
3951  * endptr is current (or recent) end of xlog, and lastredoptr is the
3952  * redo pointer of the last checkpoint. These are used to determine
3953  * whether we want to recycle rather than delete no-longer-wanted log files.
3954  */
3955 static void
3957 {
3958  DIR *xldir;
3959  struct dirent *xlde;
3960  char lastoff[MAXFNAMELEN];
3961 
3962  /*
3963  * Construct a filename of the last segment to be kept. The timeline ID
3964  * doesn't matter, we ignore that in the comparison. (During recovery,
3965  * ThisTimeLineID isn't set, so we can't use that.)
3966  */
3967  XLogFileName(lastoff, 0, segno, wal_segment_size);
3968 
3969  elog(DEBUG2, "attempting to remove WAL segments older than log file %s",
3970  lastoff);
3971 
3972  xldir = AllocateDir(XLOGDIR);
3973 
3974  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3975  {
3976  /* Ignore files that are not XLOG segments */
3977  if (!IsXLogFileName(xlde->d_name) &&
3978  !IsPartialXLogFileName(xlde->d_name))
3979  continue;
3980 
3981  /*
3982  * We ignore the timeline part of the XLOG segment identifiers in
3983  * deciding whether a segment is still needed. This ensures that we
3984  * won't prematurely remove a segment from a parent timeline. We could
3985  * probably be a little more proactive about removing segments of
3986  * non-parent timelines, but that would be a whole lot more
3987  * complicated.
3988  *
3989  * We use the alphanumeric sorting property of the filenames to decide
3990  * which ones are earlier than the lastoff segment.
3991  */
3992  if (strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
3993  {
3994  if (XLogArchiveCheckDone(xlde->d_name))
3995  {
3996  /* Update the last removed location in shared memory first */
3998 
3999  RemoveXlogFile(xlde->d_name, lastredoptr, endptr);
4000  }
4001  }
4002  }
4003 
4004  FreeDir(xldir);
4005 }
4006 
4007 /*
4008  * Remove WAL files that are not part of the given timeline's history.
4009  *
4010  * This is called during recovery, whenever we switch to follow a new
4011  * timeline, and at the end of recovery when we create a new timeline. We
4012  * wouldn't otherwise care about extra WAL files lying in pg_wal, but they
4013  * might be leftover pre-allocated or recycled WAL segments on the old timeline
4014  * that we haven't used yet, and contain garbage. If we just leave them in
4015  * pg_wal, they will eventually be archived, and we can't let that happen.
4016  * Files that belong to our timeline history are valid, because we have
4017  * successfully replayed them, but from others we can't be sure.
4018  *
4019  * 'switchpoint' is the current point in WAL where we switch to new timeline,
4020  * and 'newTLI' is the new timeline we switch to.
4021  */
4022 static void
4024 {
4025  DIR *xldir;
4026  struct dirent *xlde;
4027  char switchseg[MAXFNAMELEN];
4028  XLogSegNo endLogSegNo;
4029 
4030  XLByteToPrevSeg(switchpoint, endLogSegNo, wal_segment_size);
4031 
4032  /*
4033  * Construct a filename of the last segment to be kept.
4034  */
4035  XLogFileName(switchseg, newTLI, endLogSegNo, wal_segment_size);
4036 
4037  elog(DEBUG2, "attempting to remove WAL segments newer than log file %s",
4038  switchseg);
4039 
4040  xldir = AllocateDir(XLOGDIR);
4041 
4042  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
4043  {
4044  /* Ignore files that are not XLOG segments */
4045  if (!IsXLogFileName(xlde->d_name))
4046  continue;
4047 
4048  /*
4049  * Remove files that are on a timeline older than the new one we're
4050  * switching to, but with a segment number >= the first segment on the
4051  * new timeline.
4052  */
4053  if (strncmp(xlde->d_name, switchseg, 8) < 0 &&
4054  strcmp(xlde->d_name + 8, switchseg + 8) > 0)
4055  {
4056  /*
4057  * If the file has already been marked as .ready, however, don't
4058  * remove it yet. It should be OK to remove it - files that are
4059  * not part of our timeline history are not required for recovery
4060  * - but seems safer to let them be archived and removed later.
4061  */
4062  if (!XLogArchiveIsReady(xlde->d_name))
4063  RemoveXlogFile(xlde->d_name, InvalidXLogRecPtr, switchpoint);
4064  }
4065  }
4066 
4067  FreeDir(xldir);
4068 }
4069 
4070 /*
4071  * Recycle or remove a log file that's no longer needed.
4072  *
4073  * endptr is current (or recent) end of xlog, and lastredoptr is the
4074  * redo pointer of the last checkpoint. These are used to determine
4075  * whether we want to recycle rather than delete no-longer-wanted log files.
4076  * If lastredoptr is not known, pass invalid, and the function will recycle,
4077  * somewhat arbitrarily, 10 future segments.
4078  */
4079 static void
4080 RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr)
4081 {
4082  char path[MAXPGPATH];
4083 #ifdef WIN32
4084  char newpath[MAXPGPATH];
4085 #endif
4086  struct stat statbuf;
4087  XLogSegNo endlogSegNo;
4088  XLogSegNo recycleSegNo;
4089 
4090  if (wal_recycle)
4091  {
4092  /*
4093  * Initialize info about where to try to recycle to.
4094  */
4095  XLByteToSeg(endptr, endlogSegNo, wal_segment_size);
4096  if (lastredoptr == InvalidXLogRecPtr)
4097  recycleSegNo = endlogSegNo + 10;
4098  else
4099  recycleSegNo = XLOGfileslop(lastredoptr);
4100  }
4101  else
4102  recycleSegNo = 0; /* keep compiler quiet */
4103 
4104  snprintf(path, MAXPGPATH, XLOGDIR "/%s", segname);
4105 
4106  /*
4107  * Before deleting the file, see if it can be recycled as a future log
4108  * segment. Only recycle normal files, pg_standby for example can create
4109  * symbolic links pointing to a separate archive directory.
4110  */
4111  if (wal_recycle &&
4112  endlogSegNo <= recycleSegNo &&
4113  lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
4114  InstallXLogFileSegment(&endlogSegNo, path,
4115  true, recycleSegNo, true))
4116  {
4117  ereport(DEBUG2,
4118  (errmsg("recycled write-ahead log file \"%s\"",
4119  segname)));
4120  CheckpointStats.ckpt_segs_recycled++;
4121  /* Needn't recheck that slot on future iterations */
4122  endlogSegNo++;
4123  }
4124  else
4125  {
4126  /* No need for any more future segments... */
4127  int rc;
4128 
4129  ereport(DEBUG2,
4130  (errmsg("removing write-ahead log file \"%s\"",
4131  segname)));
4132 
4133 #ifdef WIN32
4134 
4135  /*
4136  * On Windows, if another process (e.g another backend) holds the file
4137  * open in FILE_SHARE_DELETE mode, unlink will succeed, but the file
4138  * will still show up in directory listing until the last handle is
4139  * closed. To avoid confusing the lingering deleted file for a live
4140  * WAL file that needs to be archived, rename it before deleting it.
4141  *
4142  * If another process holds the file open without FILE_SHARE_DELETE
4143  * flag, rename will fail. We'll try again at the next checkpoint.
4144  */
4145  snprintf(newpath, MAXPGPATH, "%s.deleted", path);
4146  if (rename(path, newpath) != 0)
4147  {
4148  ereport(LOG,
4150  errmsg("could not rename file \"%s\": %m",
4151  path)));
4152  return;
4153  }
4154  rc = durable_unlink(newpath, LOG);
4155 #else
4156  rc = durable_unlink(path, LOG);
4157 #endif
4158  if (rc != 0)
4159  {
4160  /* Message already logged by durable_unlink() */
4161  return;
4162  }
4163  CheckpointStats.ckpt_segs_removed++;
4164  }
4165 
4166  XLogArchiveCleanup(segname);
4167 }
4168 
4169 /*
4170  * Verify whether pg_wal and pg_wal/archive_status exist.
4171  * If the latter does not exist, recreate it.
4172  *
4173  * It is not the goal of this function to verify the contents of these
4174  * directories, but to help in cases where someone has performed a cluster
4175  * copy for PITR purposes but omitted pg_wal from the copy.
4176  *
4177  * We could also recreate pg_wal if it doesn't exist, but a deliberate
4178  * policy decision was made not to. It is fairly common for pg_wal to be
4179  * a symlink, and if that was the DBA's intent then automatically making a
4180  * plain directory would result in degraded performance with no notice.
4181  */
4182 static void
4184 {
4185  char path[MAXPGPATH];
4186  struct stat stat_buf;
4187 
4188  /* Check for pg_wal; if it doesn't exist, error out */
4189  if (stat(XLOGDIR, &stat_buf) != 0 ||
4190  !S_ISDIR(stat_buf.st_mode))
4191  ereport(FATAL,
4192  (errmsg("required WAL directory \"%s\" does not exist",
4193  XLOGDIR)));
4194 
4195  /* Check for archive_status */
4196  snprintf(path, MAXPGPATH, XLOGDIR "/archive_status");
4197  if (stat(path, &stat_buf) == 0)
4198  {
4199  /* Check for weird cases where it exists but isn't a directory */
4200  if (!S_ISDIR(stat_buf.st_mode))
4201  ereport(FATAL,
4202  (errmsg("required WAL directory \"%s\" does not exist",
4203  path)));
4204  }
4205  else
4206  {
4207  ereport(LOG,
4208  (errmsg("creating missing WAL directory \"%s\"", path)));
4209  if (MakePGDirectory(path) < 0)
4210  ereport(FATAL,
4211  (errmsg("could not create missing directory \"%s\": %m",
4212  path)));
4213  }
4214 }
4215 
4216 /*
4217  * Remove previous backup history files. This also retries creation of
4218  * .ready files for any backup history files for which XLogArchiveNotify
4219  * failed earlier.
4220  */
4221 static void
4223 {
4224  DIR *xldir;
4225  struct dirent *xlde;
4226  char path[MAXPGPATH + sizeof(XLOGDIR)];
4227 
4228  xldir = AllocateDir(XLOGDIR);
4229 
4230  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
4231  {
4232  if (IsBackupHistoryFileName(xlde->d_name))
4233  {
4234  if (XLogArchiveCheckDone(xlde->d_name))
4235  {
4236  elog(DEBUG2, "removing WAL backup history file \"%s\"",
4237  xlde->d_name);
4238  snprintf(path, sizeof(path), XLOGDIR "/%s", xlde->d_name);
4239  unlink(path);
4240  XLogArchiveCleanup(xlde->d_name);
4241  }
4242  }
4243  }
4244 
4245  FreeDir(xldir);
4246 }
4247 
4248 /*
4249  * Attempt to read an XLOG record.
4250  *
4251  * If RecPtr is valid, try to read a record at that position. Otherwise
4252  * try to read a record just after the last one previously read.
4253  *
4254  * If no valid record is available, returns NULL, or fails if emode is PANIC.
4255  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
4256  * record is available.
4257  */
4258 static XLogRecord *
4259 ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
4260  bool fetching_ckpt)
4261 {
4262  XLogRecord *record;
4263  XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
4264 
4265  /* Pass through parameters to XLogPageRead */
4266  private->fetching_ckpt = fetching_ckpt;
4267  private->emode = emode;
4268  private->randAccess = (RecPtr != InvalidXLogRecPtr);
4269 
4270  /* This is the first attempt to read this page. */
4271  lastSourceFailed = false;
4272 
4273  for (;;)
4274  {
4275  char *errormsg;
4276 
4277  record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4278  ReadRecPtr = xlogreader->ReadRecPtr;
4279  EndRecPtr = xlogreader->EndRecPtr;
4280  if (record == NULL)
4281  {
4282  if (readFile >= 0)
4283  {
4284  close(readFile);
4285  readFile = -1;
4286  }
4287 
4288  /*
4289  * We only end up here without a message when XLogPageRead()
4290  * failed - in that case we already logged something. In
4291  * StandbyMode that only happens if we have been triggered, so we
4292  * shouldn't loop anymore in that case.
4293  */
4294  if (errormsg)
4296  RecPtr ? RecPtr : EndRecPtr),
4297  (errmsg_internal("%s", errormsg) /* already translated */ ));
4298  }
4299 
4300  /*
4301  * Check page TLI is one of the expected values.
4302  */
4303  else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
4304  {
4305  char fname[MAXFNAMELEN];
4306  XLogSegNo segno;
4307  int32 offset;
4308 
4309  XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
4310  offset = XLogSegmentOffset(xlogreader->latestPagePtr,
4312  XLogFileName(fname, xlogreader->seg.ws_tli, segno,
4315  RecPtr ? RecPtr : EndRecPtr),
4316  (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
4317  xlogreader->latestPageTLI,
4318  fname,
4319  offset)));
4320  record = NULL;
4321  }
4322 
4323  if (record)
4324  {
4325  /* Great, got a record */
4326  return record;
4327  }
4328  else
4329  {
4330  /* No valid record available from this source */
4331  lastSourceFailed = true;
4332 
4333  /*
4334  * If archive recovery was requested, but we were still doing
4335  * crash recovery, switch to archive recovery and retry using the
4336  * offline archive. We have now replayed all the valid WAL in
4337  * pg_wal, so we are presumably now consistent.
4338  *
4339  * We require that there's at least some valid WAL present in
4340  * pg_wal, however (!fetching_ckpt). We could recover using the
4341  * WAL from the archive, even if pg_wal is completely empty, but
4342  * we'd have no idea how far we'd have to replay to reach
4343  * consistency. So err on the safe side and give up.
4344  */
4346  !fetching_ckpt)
4347  {
4348  ereport(DEBUG1,
4349  (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
4350  InArchiveRecovery = true;
4352  StandbyMode = true;
4353 
4354  /* initialize minRecoveryPoint to this record */
4355  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
4356  ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
4357  if (ControlFile->minRecoveryPoint < EndRecPtr)
4358  {
4359  ControlFile->minRecoveryPoint = EndRecPtr;
4360  ControlFile->minRecoveryPointTLI = ThisTimeLineID;
4361  }
4362  /* update local copy */
4363  minRecoveryPoint = ControlFile->minRecoveryPoint;
4365 
4366  /*
4367  * The startup process can update its local copy of
4368  * minRecoveryPoint from this point.
4369  */
4370  updateMinRecoveryPoint = true;
4371 
4373  LWLockRelease(ControlFileLock);
4374 
4376 
4377  /*
4378  * Before we retry, reset lastSourceFailed and currentSource
4379  * so that we will check the archive next.
4380  */
4381  lastSourceFailed = false;
4382  currentSource = 0;
4383 
4384  continue;
4385  }
4386 
4387  /* In standby mode, loop back to retry. Otherwise, give up. */
4389  continue;
4390  else
4391  return NULL;
4392  }
4393  }
4394 }
4395 
4396 /*
4397  * Scan for new timelines that might have appeared in the archive since we
4398  * started recovery.
4399  *
4400  * If there are any, the function changes recovery target TLI to the latest
4401  * one and returns 'true'.
4402  */
4403 static bool
4405 {
4406  List *newExpectedTLEs;
4407  bool found;
4408  ListCell *cell;
4409  TimeLineID newtarget;
4410  TimeLineID oldtarget = recoveryTargetTLI;
4411  TimeLineHistoryEntry *currentTle = NULL;
4412 
4414  if (newtarget == recoveryTargetTLI)
4415  {
4416  /* No new timelines found */
4417  return false;
4418  }
4419 
4420  /*
4421  * Determine the list of expected TLIs for the new TLI
4422  */
4423 
4424  newExpectedTLEs = readTimeLineHistory(newtarget);
4425 
4426  /*
4427  * If the current timeline is not part of the history of the new timeline,
4428  * we cannot proceed to it.
4429  */
4430  found = false;
4431  foreach(cell, newExpectedTLEs)
4432  {
4433  currentTle = (TimeLineHistoryEntry *) lfirst(cell);
4434 
4435  if (currentTle->tli == recoveryTargetTLI)
4436  {
4437  found = true;
4438  break;
4439  }
4440  }
4441  if (!found)
4442  {
4443  ereport(LOG,
4444  (errmsg("new timeline %u is not a child of database system timeline %u",
4445  newtarget,
4446  ThisTimeLineID)));
4447  return false;
4448  }
4449 
4450  /*
4451  * The current timeline was found in the history file, but check that the
4452  * next timeline was forked off from it *after* the current recovery
4453  * location.
4454  */
4455  if (currentTle->end < EndRecPtr)
4456  {
4457  ereport(LOG,
4458  (errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
4459  newtarget,
4461  (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr)));
4462  return false;
4463  }
4464 
4465  /* The new timeline history seems valid. Switch target */
4466  recoveryTargetTLI = newtarget;
4467  list_free_deep(expectedTLEs);
4468  expectedTLEs = newExpectedTLEs;
4469 
4470  /*
4471  * As in StartupXLOG(), try to ensure we have all the history files
4472  * between the old target and new target in pg_wal.
4473  */
4474  restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
4475 
4476  ereport(LOG,
4477  (errmsg("new target timeline is %u",
4478  recoveryTargetTLI)));
4479 
4480  return true;
4481 }
4482 
4483 /*
4484  * I/O routines for pg_control
4485  *
4486  * *ControlFile is a buffer in shared memory that holds an image of the
4487  * contents of pg_control. WriteControlFile() initializes pg_control
4488  * given a preloaded buffer, ReadControlFile() loads the buffer from
4489  * the pg_control file (during postmaster or standalone-backend startup),
4490  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
4491  *
4492  * For simplicity, WriteControlFile() initializes the fields of pg_control
4493  * that are related to checking backend/database compatibility, and
4494  * ReadControlFile() verifies they are correct. We could split out the
4495  * I/O and compatibility-check functions, but there seems no need currently.
4496  */
4497 static void
4499 {
4500  int fd;
4501  char buffer[PG_CONTROL_FILE_SIZE]; /* need not be aligned */
4502 
4503  /*
4504  * Ensure that the size of the pg_control data structure is sane. See the
4505  * comments for these symbols in pg_control.h.
4506  */
4508  "pg_control is too large for atomic disk writes");
4510  "sizeof(ControlFileData) exceeds PG_CONTROL_FILE_SIZE");
4511 
4512  /*
4513  * Initialize version and compatibility-check fields
4514  */
4515  ControlFile->pg_control_version = PG_CONTROL_VERSION;
4516  ControlFile->catalog_version_no = CATALOG_VERSION_NO;
4517 
4518  ControlFile->maxAlign = MAXIMUM_ALIGNOF;
4519  ControlFile->floatFormat = FLOATFORMAT_VALUE;
4520 
4521  ControlFile->blcksz = BLCKSZ;
4522  ControlFile->relseg_size = RELSEG_SIZE;
4523  ControlFile->xlog_blcksz = XLOG_BLCKSZ;
4524  ControlFile->xlog_seg_size = wal_segment_size;
4525 
4526  ControlFile->nameDataLen = NAMEDATALEN;
4527  ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
4528 
4530  ControlFile->loblksize = LOBLKSIZE;
4531 
4532  ControlFile->float8ByVal = FLOAT8PASSBYVAL;
4533 
4534  /* Contents are protected with a CRC */
4535  INIT_CRC32C(ControlFile->crc);
4536  COMP_CRC32C(ControlFile->crc,
4537  (char *) ControlFile,
4538  offsetof(ControlFileData, crc));
4539  FIN_CRC32C(ControlFile->crc);
4540 
4541  /*
4542  * We write out PG_CONTROL_FILE_SIZE bytes into pg_control, zero-padding
4543  * the excess over sizeof(ControlFileData). This reduces the odds of
4544  * premature-EOF errors when reading pg_control. We'll still fail when we
4545  * check the contents of the file, but hopefully with a more specific
4546  * error than "couldn't read pg_control".
4547  */
4548  memset(buffer, 0, PG_CONTROL_FILE_SIZE);
4549  memcpy(buffer, ControlFile, sizeof(ControlFileData));
4550 
4552  O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
4553  if (fd < 0)
4554  ereport(PANIC,
4556  errmsg("could not create file \"%s\": %m",
4557  XLOG_CONTROL_FILE)));
4558 
4559  errno = 0;
4561  if (write(fd, buffer, PG_CONTROL_FILE_SIZE) != PG_CONTROL_FILE_SIZE)
4562  {
4563  /* if write didn't set errno, assume problem is no disk space */
4564  if (errno == 0)
4565  errno = ENOSPC;
4566  ereport(PANIC,
4568  errmsg("could not write to file \"%s\": %m",
4569  XLOG_CONTROL_FILE)));
4570  }
4572 
4574  if (pg_fsync(fd) != 0)
4575  ereport(PANIC,
4577  errmsg("could not fsync file \"%s\": %m",
4578  XLOG_CONTROL_FILE)));
4580 
4581  if (close(fd) != 0)
4582  ereport(PANIC,
4584  errmsg("could not close file \"%s\": %m",
4585  XLOG_CONTROL_FILE)));
4586 }
4587 
4588 static void
4590 {
4591  pg_crc32c crc;
4592  int fd;
4593  static char wal_segsz_str[20];
4594  int r;
4595 
4596  /*
4597  * Read data...
4598  */
4600  O_RDWR | PG_BINARY);
4601  if (fd < 0)
4602  ereport(PANIC,
4604  errmsg("could not open file \"%s\": %m",
4605  XLOG_CONTROL_FILE)));
4606 
4608  r = read(fd, ControlFile, sizeof(ControlFileData));
4609  if (r != sizeof(ControlFileData))
4610  {
4611  if (r < 0)
4612  ereport(PANIC,
4614  errmsg("could not read file \"%s\": %m",
4615  XLOG_CONTROL_FILE)));
4616  else
4617  ereport(PANIC,
4619  errmsg("could not read file \"%s\": read %d of %zu",
4620  XLOG_CONTROL_FILE, r, sizeof(ControlFileData))));
4621  }
4623 
4624  close(fd);
4625 
4626  /*
4627  * Check for expected pg_control format version. If this is wrong, the
4628  * CRC check will likely fail because we'll be checking the wrong number
4629  * of bytes. Complaining about wrong version will probably be more
4630  * enlightening than complaining about wrong CRC.
4631  */
4632 
4633  if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
4634  ereport(FATAL,
4635  (errmsg("database files are incompatible with server"),
4636  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
4637  " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
4638  ControlFile->pg_control_version, ControlFile->pg_control_version,
4640  errhint("This could be a problem of mismatched byte ordering. It looks like you need to initdb.")));
4641 
4642  if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
4643  ereport(FATAL,
4644  (errmsg("database files are incompatible with server"),
4645  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
4646  " but the server was compiled with PG_CONTROL_VERSION %d.",
4647  ControlFile->pg_control_version, PG_CONTROL_VERSION),
4648  errhint("It looks like you need to initdb.")));
4649 
4650  /* Now check the CRC. */
4651  INIT_CRC32C(crc);
4652  COMP_CRC32C(crc,
4653  (char *) ControlFile,
4654  offsetof(ControlFileData, crc));
4655  FIN_CRC32C(crc);
4656 
4657  if (!EQ_CRC32C(crc, ControlFile->crc))
4658  ereport(FATAL,
4659  (errmsg("incorrect checksum in control file")));
4660 
4661  /*
4662  * Do compatibility checking immediately. If the database isn't
4663  * compatible with the backend executable, we want to abort before we can
4664  * possibly do any damage.
4665  */
4666  if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
4667  ereport(FATAL,
4668  (errmsg("database files are incompatible with server"),
4669  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
4670  " but the server was compiled with CATALOG_VERSION_NO %d.",
4671  ControlFile->catalog_version_no, CATALOG_VERSION_NO),
4672  errhint("It looks like you need to initdb.")));
4673  if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
4674  ereport(FATAL,
4675  (errmsg("database files are incompatible with server"),
4676  errdetail("The database cluster was initialized with MAXALIGN %d,"
4677  " but the server was compiled with MAXALIGN %d.",
4678  ControlFile->maxAlign, MAXIMUM_ALIGNOF),
4679  errhint("It looks like you need to initdb.")));
4680  if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
4681  ereport(FATAL,
4682  (errmsg("database files are incompatible with server"),
4683  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
4684  errhint("It looks like you need to initdb.")));
4685  if (ControlFile->blcksz != BLCKSZ)
4686  ereport(FATAL,
4687  (errmsg("database files are incompatible with server"),
4688  errdetail("The database cluster was initialized with BLCKSZ %d,"
4689  " but the server was compiled with BLCKSZ %d.",
4690  ControlFile->blcksz, BLCKSZ),
4691  errhint("It looks like you need to recompile or initdb.")));
4692  if (ControlFile->relseg_size != RELSEG_SIZE)
4693  ereport(FATAL,
4694  (errmsg("database files are incompatible with server"),
4695  errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
4696  " but the server was compiled with RELSEG_SIZE %d.",
4697  ControlFile->relseg_size, RELSEG_SIZE),
4698  errhint("It looks like you need to recompile or initdb.")));
4699  if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
4700  ereport(FATAL,
4701  (errmsg("database files are incompatible with server"),
4702  errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
4703  " but the server was compiled with XLOG_BLCKSZ %d.",
4704  ControlFile->xlog_blcksz, XLOG_BLCKSZ),
4705  errhint("It looks like you need to recompile or initdb.")));
4706  if (ControlFile->nameDataLen != NAMEDATALEN)
4707  ereport(FATAL,
4708  (errmsg("database files are incompatible with server"),
4709  errdetail("The database cluster was initialized with NAMEDATALEN %d,"
4710  " but the server was compiled with NAMEDATALEN %d.",
4711  ControlFile->nameDataLen, NAMEDATALEN),
4712  errhint("It looks like you need to recompile or initdb.")));
4713  if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
4714  ereport(FATAL,
4715  (errmsg("database files are incompatible with server"),
4716  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
4717  " but the server was compiled with INDEX_MAX_KEYS %d.",
4718  ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
4719  errhint("It looks like you need to recompile or initdb.")));
4720  if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
4721  ereport(FATAL,
4722  (errmsg("database files are incompatible with server"),
4723  errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
4724  " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
4725  ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
4726  errhint("It looks like you need to recompile or initdb.")));
4727  if (ControlFile->loblksize != LOBLKSIZE)
4728  ereport(FATAL,
4729  (errmsg("database files are incompatible with server"),
4730  errdetail("The database cluster was initialized with LOBLKSIZE %d,"
4731  " but the server was compiled with LOBLKSIZE %d.",
4732  ControlFile->loblksize, (int) LOBLKSIZE),
4733  errhint("It looks like you need to recompile or initdb.")));
4734 
4735 #ifdef USE_FLOAT8_BYVAL
4736  if (ControlFile->float8ByVal != true)
4737  ereport(FATAL,
4738  (errmsg("database files are incompatible with server"),
4739  errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
4740  " but the server was compiled with USE_FLOAT8_BYVAL."),
4741  errhint("It looks like you need to recompile or initdb.")));
4742 #else
4743  if (ControlFile->float8ByVal != false)
4744  ereport(FATAL,
4745  (errmsg("database files are incompatible with server"),
4746  errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
4747  " but the server was compiled without USE_FLOAT8_BYVAL."),
4748  errhint("It looks like you need to recompile or initdb.")));
4749 #endif
4750 
4751  wal_segment_size = ControlFile->xlog_seg_size;
4752 
4754  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4755  errmsg_plural("WAL segment size must be a power of two between 1 MB and 1 GB, but the control file specifies %d byte",
4756  "WAL segment size must be a power of two between 1 MB and 1 GB, but the control file specifies %d bytes",
4758  wal_segment_size)));
4759 
4760  snprintf(wal_segsz_str, sizeof(wal_segsz_str), "%d", wal_segment_size);
4761  SetConfigOption("wal_segment_size", wal_segsz_str, PGC_INTERNAL,
4762  PGC_S_OVERRIDE);
4763 
4764  /* check and update variables dependent on wal_segment_size */
4766  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4767  errmsg("\"min_wal_size\" must be at least twice \"wal_segment_size\"")));
4768 
4770  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4771  errmsg("\"max_wal_size\" must be at least twice \"wal_segment_size\"")));
4772 
4774  (wal_segment_size / XLOG_BLCKSZ * UsableBytesInPage) -
4776 
4778 
4779  /* Make the initdb settings visible as GUC variables, too */
4780  SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
4782 }
4783 
4784 /*
4785  * Utility wrapper to update the control file. Note that the control
4786  * file gets flushed.
4787  */
4788 void
4790 {
4791  update_controlfile(DataDir, ControlFile, true);
4792 }
4793 
4794 /*
4795  * Returns the unique system identifier from control file.
4796  */
4797 uint64
4799 {
4800  Assert(ControlFile != NULL);
4801  return ControlFile->system_identifier;
4802 }
4803 
4804 /*
4805  * Returns the random nonce from control file.
4806  */
4807 char *
4809 {
4810  Assert(ControlFile != NULL);
4811  return ControlFile->mock_authentication_nonce;
4812 }
4813 
4814 /*
4815  * Are checksums enabled for data pages?
4816  */
4817 bool
4819 {
4820  Assert(ControlFile != NULL);
4821  return (ControlFile->data_checksum_version > 0);
4822 }
4823 
4824 /*
4825  * Returns a fake LSN for unlogged relations.
4826  *
4827  * Each call generates an LSN that is greater than any previous value
4828  * returned. The current counter value is saved and restored across clean
4829  * shutdowns, but like unlogged relations, does not survive a crash. This can
4830  * be used in lieu of real LSN values returned by XLogInsert, if you need an
4831  * LSN-like increasing sequence of numbers without writing any WAL.
4832  */
4833 XLogRecPtr
4835 {
4836  XLogRecPtr nextUnloggedLSN;
4837 
4838  /* increment the unloggedLSN counter, need SpinLock */
4839  SpinLockAcquire(&XLogCtl->ulsn_lck);
4840  nextUnloggedLSN = XLogCtl->unloggedLSN++;
4841  SpinLockRelease(&XLogCtl->ulsn_lck);
4842 
4843  return nextUnloggedLSN;
4844 }
4845 
4846 /*
4847  * Auto-tune the number of XLOG buffers.
4848  *
4849  * The preferred setting for wal_buffers is about 3% of shared_buffers, with
4850  * a maximum of one XLOG segment (there is little reason to think that more
4851  * is helpful, at least so long as we force an fsync when switching log files)
4852  * and a minimum of 8 blocks (which was the default value prior to PostgreSQL
4853  * 9.1, when auto-tuning was added).
4854  *
4855  * This should not be called until NBuffers has received its final value.
4856  */
4857 static int
4859 {
4860  int xbuffers;
4861 
4862  xbuffers = NBuffers / 32;
4863  if (xbuffers > (wal_segment_size / XLOG_BLCKSZ))
4864  xbuffers = (wal_segment_size / XLOG_BLCKSZ);
4865  if (xbuffers < 8)
4866  xbuffers = 8;
4867  return xbuffers;
4868 }
4869 
4870 /*
4871  * GUC check_hook for wal_buffers
4872  */
4873 bool
4874 check_wal_buffers(int *newval, void **extra, GucSource source)
4875 {
4876  /*
4877  * -1 indicates a request for auto-tune.
4878  */
4879  if (*newval == -1)
4880  {
4881  /*
4882  * If we haven't yet changed the boot_val default of -1, just let it
4883  * be. We'll fix it when XLOGShmemSize is called.
4884  */
4885  if (XLOGbuffers == -1)
4886  return true;
4887 
4888  /* Otherwise, substitute the auto-tune value */
4889  *newval = XLOGChooseNumBuffers();
4890  }
4891 
4892  /*
4893  * We clamp manually-set values to at least 4 blocks. Prior to PostgreSQL
4894  * 9.1, a minimum of 4 was enforced by guc.c, but since that is no longer
4895  * the case, we just silently treat such values as a request for the
4896  * minimum. (We could throw an error instead, but that doesn't seem very
4897  * helpful.)
4898  */
4899  if (*newval < 4)
4900  *newval = 4;
4901 
4902  return true;
4903 }
4904 
4905 /*
4906  * Read the control file, set respective GUCs.
4907  *
4908  * This is to be called during startup, including a crash recovery cycle,
4909  * unless in bootstrap mode, where no control file yet exists. As there's no
4910  * usable shared memory yet (its sizing can depend on the contents of the
4911  * control file!), first store the contents in local memory. XLOGShmemInit()
4912  * will then copy it to shared memory later.
4913  *
4914  * reset just controls whether previous contents are to be expected (in the
4915  * reset case, there's a dangling pointer into old shared memory), or not.
4916  */
4917 void
4919 {
4920  Assert(reset || ControlFile == NULL);
4921  ControlFile = palloc(sizeof(ControlFileData));
4922  ReadControlFile();
4923 }
4924 
4925 /*
4926  * Initialization of shared memory for XLOG
4927  */
4928 Size
4930 {
4931  Size size;
4932 
4933  /*
4934  * If the value of wal_buffers is -1, use the preferred auto-tune value.
4935  * This isn't an amazingly clean place to do this, but we must wait till
4936  * NBuffers has received its final value, and must do it before using the
4937  * value of XLOGbuffers to do anything important.
4938  */
4939  if (XLOGbuffers == -1)
4940  {
4941  char buf[32];
4942 
4943  snprintf(buf, sizeof(buf), "%d", XLOGChooseNumBuffers());
4944  SetConfigOption("wal_buffers", buf, PGC_POSTMASTER, PGC_S_OVERRIDE);
4945  }
4946  Assert(XLOGbuffers > 0);
4947 
4948  /* XLogCtl */
4949  size = sizeof(XLogCtlData);
4950 
4951  /* WAL insertion locks, plus alignment */
4952  size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
4953  /* xlblocks array */
4954  size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4955  /* extra alignment padding for XLOG I/O buffers */
4956  size = add_size(size, XLOG_BLCKSZ);
4957  /* and the buffers themselves */
4958  size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4959 
4960  /*
4961  * Note: we don't count ControlFileData, it comes out of the "slop factor"
4962  * added by CreateSharedMemoryAndSemaphores. This lets us use this
4963  * routine again below to compute the actual allocation size.
4964  */
4965 
4966  return size;
4967 }
4968 
4969 void
4971 {
4972  bool foundCFile,
4973  foundXLog;
4974  char *allocptr;
4975  int i;
4976  ControlFileData *localControlFile;
4977 
4978 #ifdef WAL_DEBUG
4979 
4980  /*
4981  * Create a memory context for WAL debugging that's exempt from the normal
4982  * "no pallocs in critical section" rule. Yes, that can lead to a PANIC if
4983  * an allocation fails, but wal_debug is not for production use anyway.
4984  */
4985  if (walDebugCxt == NULL)
4986  {
4988  "WAL Debug",
4990  MemoryContextAllowInCriticalSection(walDebugCxt, true);
4991  }
4992 #endif
4993 
4994 
4995  XLogCtl = (XLogCtlData *)
4996  ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4997 
4998  localControlFile = ControlFile;
4999  ControlFile = (ControlFileData *)
5000  ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
5001 
5002  if (foundCFile || foundXLog)
5003  {
5004  /* both should be present or neither */
5005  Assert(foundCFile && foundXLog);
5006 
5007  /* Initialize local copy of WALInsertLocks and register the tranche */
5008  WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
5010  "wal_insert");
5011 
5012  if (localControlFile)
5013  pfree(localControlFile);
5014  return;
5015  }
5016  memset(XLogCtl, 0, sizeof(XLogCtlData));
5017 
5018  /*
5019  * Already have read control file locally, unless in bootstrap mode. Move
5020  * contents into shared memory.
5021  */
5022  if (localControlFile)
5023  {
5024  memcpy(ControlFile, localControlFile, sizeof(ControlFileData));
5025  pfree(localControlFile);
5026  }
5027 
5028  /*
5029  * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
5030  * multiple of the alignment for same, so no extra alignment padding is
5031  * needed here.
5032  */
5033  allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
5034  XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
5035  memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
5036  allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
5037 
5038 
5039  /* WAL insertion locks. Ensure they're aligned to the full padded size */
5040  allocptr += sizeof(WALInsertLockPadded) -
5041  ((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
5042  WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
5043  (WALInsertLockPadded *) allocptr;
5044  allocptr += sizeof(WALInsertLockPadded) * NUM_XLOGINSERT_LOCKS;
5045 
5047  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
5048  {
5049  LWLockInitialize(&WALInsertLocks[i].l.lock, LWTRANCHE_WAL_INSERT);
5050  WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
5051  WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
5052  }
5053 
5054  /*
5055  * Align the start of the page buffers to a full xlog block size boundary.
5056  * This simplifies some calculations in XLOG insertion. It is also
5057  * required for O_DIRECT.
5058  */
5059  allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
5060  XLogCtl->pages = allocptr;
5061  memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
5062 
5063  /*
5064  * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
5065  * in additional info.)
5066  */
5067  XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
5068  XLogCtl->SharedRecoveryInProgress = true;
5069  XLogCtl->SharedHotStandbyActive = false;
5070  XLogCtl->WalWriterSleeping = false;
5071 
5072  SpinLockInit(&XLogCtl->Insert.insertpos_lck);
5073  SpinLockInit(&XLogCtl->info_lck);
5074  SpinLockInit(&XLogCtl->ulsn_lck);
5076 }
5077 
5078 /*
5079  * This func must be called ONCE on system install. It creates pg_control
5080  * and the initial XLOG segment.
5081  */
5082 void
5084 {
5085  CheckPoint checkPoint;
5086  char *buffer;
5087  XLogPageHeader page;
5088  XLogLongPageHeader longpage;
5089  XLogRecord *record;
5090  char *recptr;
5091  bool use_existent;
5092  uint64 sysidentifier;
5093  char mock_auth_nonce[MOCK_AUTH_NONCE_LEN];
5094  struct timeval tv;
5095  pg_crc32c crc;
5096 
5097  /*
5098  * Select a hopefully-unique system identifier code for this installation.
5099  * We use the result of gettimeofday(), including the fractional seconds
5100  * field, as being about as unique as we can easily get. (Think not to
5101  * use random(), since it hasn't been seeded and there's no portable way
5102  * to seed it other than the system clock value...) The upper half of the
5103  * uint64 value is just the tv_sec part, while the lower half contains the
5104  * tv_usec part (which must fit in 20 bits), plus 12 bits from our current
5105  * PID for a little extra uniqueness. A person knowing this encoding can
5106  * determine the initialization time of the installation, which could
5107  * perhaps be useful sometimes.
5108  */
5109  gettimeofday(&tv, NULL);
5110  sysidentifier = ((uint64) tv.tv_sec) << 32;
5111  sysidentifier |= ((uint64) tv.tv_usec) << 12;
5112  sysidentifier |= getpid() & 0xFFF;
5113 
5114  /*
5115  * Generate a random nonce. This is used for authentication requests that
5116  * will fail because the user does not exist. The nonce is used to create
5117  * a genuine-looking password challenge for the non-existent user, in lieu
5118  * of an actual stored password.
5119  */
5120  if (!pg_strong_random(mock_auth_nonce, MOCK_AUTH_NONCE_LEN))
5121  ereport(PANIC,
5122  (errcode(ERRCODE_INTERNAL_ERROR),
5123  errmsg("could not generate secret authorization token")));
5124 
5125  /* First timeline ID is always 1 */
5126  ThisTimeLineID = 1;
5127 
5128  /* page buffer must be aligned suitably for O_DIRECT */
5129  buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
5130  page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
5131  memset(page, 0, XLOG_BLCKSZ);
5132 
5133  /*
5134  * Set up information for the initial checkpoint record
5135  *
5136  * The initial checkpoint record is written to the beginning of the WAL
5137  * segment with logid=0 logseg=1. The very first WAL segment, 0/0, is not
5138  * used, so that we can use 0/0 to mean "before any valid WAL segment".
5139  */
5140  checkPoint.redo = wal_segment_size + SizeOfXLogLongPHD;
5141  checkPoint.ThisTimeLineID = ThisTimeLineID;
5142  checkPoint.PrevTimeLineID = ThisTimeLineID;
5143  checkPoint.fullPageWrites = fullPageWrites;
5144  checkPoint.nextFullXid =
5146  checkPoint.nextOid = FirstBootstrapObjectId;
5147  checkPoint.nextMulti = FirstMultiXactId;
5148  checkPoint.nextMultiOffset = 0;
5149  checkPoint.oldestXid = FirstNormalTransactionId;
5150  checkPoint.oldestXidDB = TemplateDbOid;
5151  checkPoint.oldestMulti = FirstMultiXactId;
5152  checkPoint.oldestMultiDB = TemplateDbOid;
5155  checkPoint.time = (pg_time_t) time(NULL);
5157 
5159  ShmemVariableCache->nextOid = checkPoint.nextOid;
5161  MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
5162  AdvanceOldestClogXid(checkPoint.oldestXid);
5163  SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
5164  SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
5166 
5167  /* Set up the XLOG page header */
5168  page->xlp_magic = XLOG_PAGE_MAGIC;
5169  page->xlp_info = XLP_LONG_HEADER;
5170  page->xlp_tli = ThisTimeLineID;
5172  longpage = (XLogLongPageHeader) page;
5173  longpage->xlp_sysid = sysidentifier;
5174  longpage->xlp_seg_size = wal_segment_size;
5175  longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
5176 
5177  /* Insert the initial checkpoint record */
5178  recptr = ((char *) page + SizeOfXLogLongPHD);
5179  record = (XLogRecord *) recptr;
5180  record->xl_prev = 0;
5181  record->xl_xid = InvalidTransactionId;
5182  record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint);
5184  record->xl_rmid = RM_XLOG_ID;
5185  recptr += SizeOfXLogRecord;
5186  /* fill the XLogRecordDataHeaderShort struct */
5187  *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT;
5188  *(recptr++) = sizeof(checkPoint);
5189  memcpy(recptr, &checkPoint, sizeof(checkPoint));
5190  recptr += sizeof(checkPoint);
5191  Assert(recptr - (char *) record == record->xl_tot_len);
5192 
5193  INIT_CRC32C(crc);
5194  COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
5195  COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
5196  FIN_CRC32C(crc);
5197  record->xl_crc = crc;
5198 
5199  /* Create first XLOG segment file */
5200  use_existent = false;
5201  openLogFile = XLogFileInit(1, &use_existent, false);
5202 
5203  /* Write the first page with the initial record */
5204  errno = 0;
5206  if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
5207  {
5208  /* if write didn't set errno, assume problem is no disk space */
5209  if (errno == 0)
5210  errno = ENOSPC;
5211  ereport(PANIC,
5213  errmsg("could not write bootstrap write-ahead log file: %m")));
5214  }
5216 
5218  if (pg_fsync(openLogFile) != 0)
5219  ereport(PANIC,
5221  errmsg("could not fsync bootstrap write-ahead log file: %m")));
5223 
5224  if (close(openLogFile) != 0)
5225  ereport(PANIC,
5227  errmsg("could not close bootstrap write-ahead log file: %m")));
5228 
5229  openLogFile = -1;
5230 
5231  /* Now create pg_control */
5232 
5233  memset(ControlFile, 0, sizeof(ControlFileData));
5234  /* Initialize pg_control status fields */
5235  ControlFile->system_identifier = sysidentifier;
5236  memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN);
5237  ControlFile->state = DB_SHUTDOWNED;
5238  ControlFile->time = checkPoint.time;
5239  ControlFile->checkPoint = checkPoint.redo;
5240  ControlFile->checkPointCopy = checkPoint;
5241  ControlFile->unloggedLSN = FirstNormalUnloggedLSN;
5242 
5243  /* Set important parameter values for use when replaying WAL */
5244  ControlFile->MaxConnections = MaxConnections;
5246  ControlFile->max_wal_senders = max_wal_senders;
5247  ControlFile->max_prepared_xacts = max_prepared_xacts;
5248  ControlFile->max_locks_per_xact = max_locks_per_xact;
5249  ControlFile->wal_level = wal_level;
5250  ControlFile->wal_log_hints = wal_log_hints;
5253 
5254  /* some additional ControlFile fields are set in WriteControlFile() */
5255 
5256  WriteControlFile();
5257 
5258  /* Bootstrap the commit log, too */
5259  BootStrapCLOG();
5263 
5264  pfree(buffer);
5265 
5266  /*
5267  * Force control file to be read - in contrast to normal processing we'd
5268  * otherwise never run the checks and GUC related initializations therein.
5269  */
5270  ReadControlFile();
5271 }
5272 
5273 static char *
5275 {
5276  static char buf[128];
5277 
5278  pg_strftime(buf, sizeof(buf),
5279  "%Y-%m-%d %H:%M:%S %Z",
5280  pg_localtime(&tnow, log_timezone));
5281 
5282  return buf;
5283 }
5284 
5285 /*
5286  * See if there are any recovery signal files and if so, set state for
5287  * recovery.
5288  *
5289  * See if there is a recovery command file (recovery.conf), and if so
5290  * throw an ERROR since as of PG12 we no longer recognize that.
5291  */
5292 static void
5294 {
5295  struct stat stat_buf;
5296 
5298  return;
5299 
5300  /*
5301  * Check for old recovery API file: recovery.conf
5302  */
5303  if (stat(RECOVERY_COMMAND_FILE, &stat_buf) == 0)
5304  ereport(FATAL,
5306  errmsg("using recovery command file \"%s\" is not supported",
5308 
5309  /*
5310  * Remove unused .done file, if present. Ignore if absent.
5311  */
5312  unlink(RECOVERY_COMMAND_DONE);
5313 
5314  /*
5315  * Check for recovery signal files and if found, fsync them since they
5316  * represent server state information. We don't sweat too much about the
5317  * possibility of fsync failure, however.
5318  *
5319  * If present, standby signal file takes precedence. If neither is present
5320  * then we won't enter archive recovery.
5321  */
5322  if (stat(STANDBY_SIGNAL_FILE, &stat_buf) == 0)
5323  {
5324  int fd;
5325 
5327  S_IRUSR | S_IWUSR);
5328  if (fd >= 0)
5329  {
5330  (void) pg_fsync(fd);
5331  close(fd);
5332  }
5334  }
5335  else if (stat(RECOVERY_SIGNAL_FILE, &stat_buf) == 0)
5336  {
5337  int fd;
5338 
5340  S_IRUSR | S_IWUSR);
5341  if (fd >= 0)
5342  {
5343  (void) pg_fsync(fd);
5344  close(fd);
5345  }
5347  }
5348 
5349  StandbyModeRequested = false;
5350  ArchiveRecoveryRequested = false;
5352  {
5353  StandbyModeRequested = true;
5354  ArchiveRecoveryRequested = true;
5355  }
5356  else if (recovery_signal_file_found)
5357  {
5358  StandbyModeRequested = false;
5359  ArchiveRecoveryRequested = true;
5360  }
5361  else
5362  return;
5363 
5364  /*
5365  * We don't support standby mode in standalone backends; that requires
5366  * other processes such as the WAL receiver to be alive.
5367  */
5369  ereport(FATAL,
5370  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5371  errmsg("standby mode is not supported by single-user servers")));
5372 }
5373 
5374 static void
5376 {
5378  return;
5379 
5380  /*
5381  * Check for compulsory parameters
5382  */
5384  {
5385  if ((PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) &&
5386  (recoveryRestoreCommand == NULL || strcmp(recoveryRestoreCommand, "") == 0))
5387  ereport(WARNING,
5388  (errmsg("specified neither primary_conninfo nor restore_command"),
5389  errhint("The database server will regularly poll the pg_wal subdirectory to check for files placed there.")));
5390  }
5391  else
5392  {
5393  if (recoveryRestoreCommand == NULL ||
5394  strcmp(recoveryRestoreCommand, "") == 0)
5395  ereport(FATAL,
5396  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5397  errmsg("must specify restore_command when standby mode is not enabled")));
5398  }
5399 
5400  /*
5401  * Override any inconsistent requests. Note that this is a change of
5402  * behaviour in 9.5; prior to this we simply ignored a request to pause if
5403  * hot_standby = off, which was surprising behaviour.
5404  */
5408 
5409  /*
5410  * Final parsing of recovery_target_time string; see also
5411  * check_recovery_target_time().
5412  */
5414  {
5418  Int32GetDatum(-1)));
5419  }
5420 
5421  /*
5422  * If user specified recovery_target_timeline, validate it or compute the
5423  * "latest" value. We can't do this until after we've gotten the restore
5424  * command and set InArchiveRecovery, because we need to fetch timeline
5425  * history files from the archive.
5426  */
5428  {
5430 
5431  /* Timeline 1 does not have a history file, all else should */
5432  if (rtli != 1 && !existsTimeLineHistory(rtli))
5433  ereport(FATAL,
5434  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5435  errmsg("recovery target timeline %u does not exist",
5436  rtli)));
5437  recoveryTargetTLI = rtli;
5438  }
5440  {
5441  /* We start the "latest" search from pg_control's timeline */
5443  }
5444  else
5445  {
5446  /*
5447  * else we just use the recoveryTargetTLI as already read from
5448  * ControlFile
5449  */
5451  }
5452 }
5453 
5454 /*
5455  * Exit archive-recovery state
5456  */
5457 static void
5459 {
5460  char xlogfname[MAXFNAMELEN];
5461  XLogSegNo endLogSegNo;
5462  XLogSegNo startLogSegNo;
5463 
5464  /* we always switch to a new timeline after archive recovery */
5465  Assert(endTLI != ThisTimeLineID);
5466 
5467  /*
5468  * We are no longer in archive recovery state.
5469  */
5470  InArchiveRecovery = false;
5471 
5472  /*
5473  * Update min recovery point one last time.
5474  */
5476 
5477  /*
5478  * If the ending log segment is still open, close it (to avoid problems on
5479  * Windows with trying to rename or delete an open file).
5480  */
5481  if (readFile >= 0)
5482  {
5483  close(readFile);
5484  readFile = -1;
5485  }
5486 
5487  /*
5488  * Calculate the last segment on the old timeline, and the first segment
5489  * on the new timeline. If the switch happens in the middle of a segment,
5490  * they are the same, but if the switch happens exactly at a segment
5491  * boundary, startLogSegNo will be endLogSegNo + 1.
5492  */
5493  XLByteToPrevSeg(endOfLog, endLogSegNo, wal_segment_size);
5494  XLByteToSeg(endOfLog, startLogSegNo, wal_segment_size);
5495 
5496  /*
5497  * Initialize the starting WAL segment for the new timeline. If the switch
5498  * happens in the middle of a segment, copy data from the last WAL segment
5499  * of the old timeline up to the switch point, to the starting WAL segment
5500  * on the new timeline.
5501  */
5502  if (endLogSegNo == startLogSegNo)
5503  {
5504  /*
5505  * Make a copy of the file on the new timeline.
5506  *
5507  * Writing WAL isn't allowed yet, so there are no locking
5508  * considerations. But we should be just as tense as XLogFileInit to
5509  * avoid emplacing a bogus file.
5510  */
5511  XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
5512  XLogSegmentOffset(endOfLog, wal_segment_size));
5513  }
5514  else
5515  {
5516  /*
5517  * The switch happened at a segment boundary, so just create the next
5518  * segment on the new timeline.
5519  */
5520  bool use_existent = true;
5521  int fd;
5522 
5523  fd = XLogFileInit(startLogSegNo, &use_existent, true);
5524 
5525  if (close(fd) != 0)
5526  {
5527  char xlogfname[MAXFNAMELEN];
5528  int save_errno = errno;
5529 
5530  XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo,
5532  errno = save_errno;
5533  ereport(ERROR,
5535  errmsg("could not close file \"%s\": %m", xlogfname)));
5536  }
5537  }
5538 
5539  /*
5540  * Let's just make real sure there are not .ready or .done flags posted
5541  * for the new segment.
5542  */
5543  XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo, wal_segment_size);
5544  XLogArchiveCleanup(xlogfname);
5545 
5546  /*
5547  * Remove the signal files out of the way, so that we don't accidentally
5548  * re-enter archive recovery mode in a subsequent crash.
5549  */
5552 
5555 
5556  ereport(LOG,
5557  (errmsg("archive recovery complete")));
5558 }
5559 
5560 /*
5561  * Extract timestamp from WAL record.
5562  *
5563  * If the record contains a timestamp, returns true, and saves the timestamp
5564  * in *recordXtime. If the record type has no timestamp, returns false.
5565  * Currently, only transaction commit/abort records and restore points contain
5566  * timestamps.
5567  */
5568 static bool
5570 {
5571  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
5572  uint8 xact_info = info & XLOG_XACT_OPMASK;
5573  uint8 rmid = XLogRecGetRmid(record);
5574 
5575  if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
5576  {
5577  *recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
5578  return true;
5579  }
5580  if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
5581  xact_info == XLOG_XACT_COMMIT_PREPARED))
5582  {
5583  *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
5584  return true;
5585  }
5586  if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
5587  xact_info == XLOG_XACT_ABORT_PREPARED))
5588  {
5589  *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
5590  return true;
5591  }
5592  return false;
5593 }
5594 
5595 /*
5596  * For point-in-time recovery, this function decides whether we want to
5597  * stop applying the XLOG before the current record.
5598  *
5599  * Returns true if we are stopping, false otherwise. If stopping, some
5600  * information is saved in recoveryStopXid et al for use in annotating the
5601  * new timeline's history file.
5602  */
5603 static bool
5605 {
5606  bool stopsHere = false;
5607  uint8 xact_info;
5608  bool isCommit;
5609  TimestampTz recordXtime = 0;
5610  TransactionId recordXid;
5611 
5612  /*
5613  * Ignore recovery target settings when not in archive recovery (meaning
5614  * we are in crash recovery).
5615  */
5617  return false;
5618 
5619  /* Check if we should stop as soon as reaching consistency */
5621  {
5622  ereport(LOG,
5623  (errmsg("recovery stopping after reaching consistency")));
5624 
5625  recoveryStopAfter = false;
5628  recoveryStopTime = 0;
5629  recoveryStopName[0] = '\0';
5630  return true;
5631  }
5632 
5633  /* Check if target LSN has been reached */
5636  record->ReadRecPtr >= recoveryTargetLSN)
5637  {
5638  recoveryStopAfter = false;
5640  recoveryStopLSN = record->ReadRecPtr;
5641  recoveryStopTime = 0;
5642  recoveryStopName[0] = '\0';
5643  ereport(LOG,
5644  (errmsg("recovery stopping before WAL location (LSN) \"%X/%X\"",
5645  (uint32) (recoveryStopLSN >> 32),
5646  (uint32) recoveryStopLSN)));
5647  return true;
5648  }
5649 
5650  /* Otherwise we only consider stopping before COMMIT or ABORT records. */
5651  if (XLogRecGetRmid(record) != RM_XACT_ID)
5652  return false;
5653 
5654  xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
5655 
5656  if (xact_info == XLOG_XACT_COMMIT)
5657  {
5658  isCommit = true;
5659  recordXid = XLogRecGetXid(record);
5660  }
5661  else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
5662  {
5663  xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
5664  xl_xact_parsed_commit parsed;
5665 
5666  isCommit = true;
5668  xlrec,
5669  &parsed);
5670  recordXid = parsed.twophase_xid;
5671  }
5672  else if (xact_info == XLOG_XACT_ABORT)
5673  {
5674  isCommit = false;
5675  recordXid = XLogRecGetXid(record);
5676  }
5677  else if (xact_info == XLOG_XACT_ABORT_PREPARED)
5678  {
5679  xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
5680  xl_xact_parsed_abort parsed;
5681 
5682  isCommit = true;
5684  xlrec,
5685  &parsed);
5686  recordXid = parsed.twophase_xid;
5687  }
5688  else
5689  return false;
5690 
5692  {
5693  /*
5694  * There can be only one transaction end record with this exact
5695  * transactionid
5696  *
5697  * when testing for an xid, we MUST test for equality only, since
5698  * transactions are numbered in the order they start, not the order
5699  * they complete. A higher numbered xid will complete before you about
5700  * 50% of the time...
5701  */
5702  stopsHere = (recordXid == recoveryTargetXid);
5703  }
5704 
5706  getRecordTimestamp(record, &recordXtime))
5707  {
5708  /*
5709  * There can be many transactions that share the same commit time, so
5710  * we stop after the last one, if we are inclusive, or stop at the
5711  * first one if we are exclusive
5712  */
5714  stopsHere = (recordXtime > recoveryTargetTime);
5715  else
5716  stopsHere = (recordXtime >= recoveryTargetTime);
5717  }
5718 
5719  if (stopsHere)
5720  {
5721  recoveryStopAfter = false;
5722  recoveryStopXid = recordXid;
5723  recoveryStopTime = recordXtime;
5725  recoveryStopName[0] = '\0';
5726 
5727  if (isCommit)
5728  {
5729  ereport(LOG,
5730  (errmsg("recovery stopping before commit of transaction %u, time %s",
5733  }
5734  else
5735  {
5736  ereport(LOG,
5737  (errmsg("recovery stopping before abort of transaction %u, time %s",
5740  }
5741  }
5742 
5743  return stopsHere;
5744 }
5745 
5746 /*
5747  * Same as recoveryStopsBefore, but called after applying the record.
5748  *
5749  * We also track the timestamp of the latest applied COMMIT/ABORT
5750  * record in XLogCtl->recoveryLastXTime.
5751  */
5752 static bool
5754 {
5755  uint8 info;
5756  uint8 xact_info;
5757  uint8 rmid;
5758  TimestampTz recordXtime;
5759 
5760  /*
5761  * Ignore recovery target settings when not in archive recovery (meaning
5762  * we are in crash recovery).
5763  */
5765  return false;
5766 
5767  info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
5768  rmid = XLogRecGetRmid(record);
5769 
5770  /*
5771  * There can be many restore points that share the same name; we stop at
5772  * the first one.
5773  */
5775  rmid == RM_XLOG_ID && info ==