PostgreSQL Source Code  git master
xlog.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  * PostgreSQL write-ahead log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/backend/access/transam/xlog.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <math.h>
19 #include <time.h>
20 #include <fcntl.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <unistd.h>
24 
25 #include "access/clog.h"
26 #include "access/commit_ts.h"
27 #include "access/heaptoast.h"
28 #include "access/multixact.h"
29 #include "access/rewriteheap.h"
30 #include "access/subtrans.h"
31 #include "access/timeline.h"
32 #include "access/transam.h"
33 #include "access/twophase.h"
34 #include "access/xact.h"
35 #include "access/xlog_internal.h"
36 #include "access/xloginsert.h"
37 #include "access/xlogreader.h"
38 #include "access/xlogutils.h"
39 #include "catalog/catversion.h"
40 #include "catalog/pg_control.h"
41 #include "catalog/pg_database.h"
42 #include "commands/tablespace.h"
44 #include "miscadmin.h"
45 #include "pgstat.h"
46 #include "port/atomics.h"
47 #include "postmaster/bgwriter.h"
48 #include "postmaster/walwriter.h"
49 #include "postmaster/startup.h"
50 #include "replication/basebackup.h"
51 #include "replication/logical.h"
52 #include "replication/slot.h"
53 #include "replication/origin.h"
54 #include "replication/snapbuild.h"
56 #include "replication/walsender.h"
57 #include "storage/bufmgr.h"
58 #include "storage/fd.h"
59 #include "storage/ipc.h"
60 #include "storage/large_object.h"
61 #include "storage/latch.h"
62 #include "storage/pmsignal.h"
63 #include "storage/predicate.h"
64 #include "storage/proc.h"
65 #include "storage/procarray.h"
66 #include "storage/reinit.h"
67 #include "storage/smgr.h"
68 #include "storage/spin.h"
69 #include "storage/sync.h"
70 #include "utils/builtins.h"
71 #include "utils/guc.h"
72 #include "utils/memutils.h"
73 #include "utils/ps_status.h"
74 #include "utils/relmapper.h"
75 #include "utils/snapmgr.h"
76 #include "utils/timestamp.h"
77 #include "pg_trace.h"
78 
80 
81 /* Unsupported old recovery command file names (relative to $PGDATA) */
82 #define RECOVERY_COMMAND_FILE "recovery.conf"
83 #define RECOVERY_COMMAND_DONE "recovery.done"
84 
85 /* User-settable parameters */
86 int max_wal_size_mb = 1024; /* 1 GB */
87 int min_wal_size_mb = 80; /* 80 MB */
89 int XLOGbuffers = -1;
92 char *XLogArchiveCommand = NULL;
93 bool EnableHotStandby = false;
94 bool fullPageWrites = true;
95 bool wal_log_hints = false;
96 bool wal_compression = false;
99 bool wal_init_zero = true;
100 bool wal_recycle = true;
101 bool log_checkpoints = false;
104 int CommitDelay = 0; /* precommit delay in microseconds */
105 int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
107 
108 #ifdef WAL_DEBUG
109 bool XLOG_DEBUG = false;
110 #endif
111 
113 
114 /*
115  * Number of WAL insertion locks to use. A higher value allows more insertions
116  * to happen concurrently, but adds some CPU overhead to flushing the WAL,
117  * which needs to iterate all the locks.
118  */
119 #define NUM_XLOGINSERT_LOCKS 8
120 
121 /*
122  * Max distance from last checkpoint, before triggering a new xlog-based
123  * checkpoint.
124  */
126 
127 /* Estimated distance between checkpoints, in bytes */
128 static double CheckPointDistanceEstimate = 0;
129 static double PrevCheckPointDistance = 0;
130 
131 /*
132  * GUC support
133  */
135  {"fsync", SYNC_METHOD_FSYNC, false},
136 #ifdef HAVE_FSYNC_WRITETHROUGH
137  {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
138 #endif
139 #ifdef HAVE_FDATASYNC
140  {"fdatasync", SYNC_METHOD_FDATASYNC, false},
141 #endif
142 #ifdef OPEN_SYNC_FLAG
143  {"open_sync", SYNC_METHOD_OPEN, false},
144 #endif
145 #ifdef OPEN_DATASYNC_FLAG
146  {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
147 #endif
148  {NULL, 0, false}
149 };
150 
151 
152 /*
153  * Although only "on", "off", and "always" are documented,
154  * we accept all the likely variants of "on" and "off".
155  */
157  {"always", ARCHIVE_MODE_ALWAYS, false},
158  {"on", ARCHIVE_MODE_ON, false},
159  {"off", ARCHIVE_MODE_OFF, false},
160  {"true", ARCHIVE_MODE_ON, true},
161  {"false", ARCHIVE_MODE_OFF, true},
162  {"yes", ARCHIVE_MODE_ON, true},
163  {"no", ARCHIVE_MODE_OFF, true},
164  {"1", ARCHIVE_MODE_ON, true},
165  {"0", ARCHIVE_MODE_OFF, true},
166  {NULL, 0, false}
167 };
168 
170  {"pause", RECOVERY_TARGET_ACTION_PAUSE, false},
171  {"promote", RECOVERY_TARGET_ACTION_PROMOTE, false},
172  {"shutdown", RECOVERY_TARGET_ACTION_SHUTDOWN, false},
173  {NULL, 0, false}
174 };
175 
176 /*
177  * Statistics for current checkpoint are collected in this global struct.
178  * Because only the checkpointer or a stand-alone backend can perform
179  * checkpoints, this will be unused in normal backends.
180  */
182 
183 /*
184  * ThisTimeLineID will be same in all backends --- it identifies current
185  * WAL timeline for the database system.
186  */
188 
189 /*
190  * Are we doing recovery from XLOG?
191  *
192  * This is only ever true in the startup process; it should be read as meaning
193  * "this process is replaying WAL records", rather than "the system is in
194  * recovery mode". It should be examined primarily by functions that need
195  * to act differently when called from a WAL redo function (e.g., to skip WAL
196  * logging). To check whether the system is in recovery regardless of which
197  * process you're running in, use RecoveryInProgress() but only after shared
198  * memory startup and lock initialization.
199  */
200 bool InRecovery = false;
201 
202 /* Are we in Hot Standby mode? Only valid in startup process, see xlog.h */
204 
206 
207 /* Local copy of WalRcv->receivedUpto */
210 
211 /*
212  * During recovery, lastFullPageWrites keeps track of full_page_writes that
213  * the replayed WAL records indicate. It's initialized with full_page_writes
214  * that the recovery starting checkpoint record indicates, and then updated
215  * each time XLOG_FPW_CHANGE record is replayed.
216  */
217 static bool lastFullPageWrites;
218 
219 /*
220  * Local copy of SharedRecoveryInProgress variable. True actually means "not
221  * known, need to check the shared state".
222  */
223 static bool LocalRecoveryInProgress = true;
224 
225 /*
226  * Local copy of SharedHotStandbyActive variable. False actually means "not
227  * known, need to check the shared state".
228  */
229 static bool LocalHotStandbyActive = false;
230 
231 /*
232  * Local state for XLogInsertAllowed():
233  * 1: unconditionally allowed to insert XLOG
234  * 0: unconditionally not allowed to insert XLOG
235  * -1: must check RecoveryInProgress(); disallow until it is false
236  * Most processes start with -1 and transition to 1 after seeing that recovery
237  * is not in progress. But we can also force the value for special cases.
238  * The coding in XLogInsertAllowed() depends on the first two of these states
239  * being numerically the same as bool true and false.
240  */
241 static int LocalXLogInsertAllowed = -1;
242 
243 /*
244  * When ArchiveRecoveryRequested is set, archive recovery was requested,
245  * ie. signal files were present. When InArchiveRecovery is set, we are
246  * currently recovering using offline XLOG archives. These variables are only
247  * valid in the startup process.
248  *
249  * When ArchiveRecoveryRequested is true, but InArchiveRecovery is false, we're
250  * currently performing crash recovery using only XLOG files in pg_wal, but
251  * will switch to using offline XLOG archives as soon as we reach the end of
252  * WAL in pg_wal.
253 */
255 bool InArchiveRecovery = false;
256 
257 static bool standby_signal_file_found = false;
258 static bool recovery_signal_file_found = false;
259 
260 /* Was the last xlog file restored from archive, or local? */
261 static bool restoredFromArchive = false;
262 
263 /* Buffers dedicated to consistency checks of size BLCKSZ */
264 static char *replay_image_masked = NULL;
265 static char *master_image_masked = NULL;
266 
267 /* options formerly taken from recovery.conf for archive recovery */
269 char *recoveryEndCommand = NULL;
277 const char *recoveryTargetName;
281 
282 /* options formerly taken from recovery.conf for XLOG streaming */
283 bool StandbyModeRequested = false;
284 char *PrimaryConnInfo = NULL;
285 char *PrimarySlotName = NULL;
286 char *PromoteTriggerFile = NULL;
287 
288 /* are we currently in standby mode? */
289 bool StandbyMode = false;
290 
291 /* whether request for fast promotion has been made yet */
292 static bool fast_promote = false;
293 
294 /*
295  * if recoveryStopsBefore/After returns true, it saves information of the stop
296  * point here
297  */
302 static bool recoveryStopAfter;
303 
304 /*
305  * During normal operation, the only timeline we care about is ThisTimeLineID.
306  * During recovery, however, things are more complicated. To simplify life
307  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
308  * scan through the WAL history (that is, it is the line that was active when
309  * the currently-scanned WAL record was generated). We also need these
310  * timeline values:
311  *
312  * recoveryTargetTimeLineGoal: what the user requested, if any
313  *
314  * recoveryTargetTLIRequested: numeric value of requested timeline, if constant
315  *
316  * recoveryTargetTLI: the currently understood target timeline; changes
317  *
318  * expectedTLEs: a list of TimeLineHistoryEntries for recoveryTargetTLI and the timelines of
319  * its known parents, newest first (so recoveryTargetTLI is always the
320  * first list member). Only these TLIs are expected to be seen in the WAL
321  * segments we read, and indeed only these TLIs will be considered as
322  * candidate WAL files to open at all.
323  *
324  * curFileTLI: the TLI appearing in the name of the current input WAL file.
325  * (This is not necessarily the same as ThisTimeLineID, because we could
326  * be scanning data that was copied from an ancestor timeline when the current
327  * file was created.) During a sequential scan we do not allow this value
328  * to decrease.
329  */
335 
336 /*
337  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
338  * current backend. It is updated for all inserts. XactLastRecEnd points to
339  * end+1 of the last record, and is reset when we end a top-level transaction,
340  * or start a new one; so it can be used to tell if the current transaction has
341  * created any XLOG records.
342  *
343  * While in parallel mode, this may not be fully up to date. When committing,
344  * a transaction can assume this covers all xlog records written either by the
345  * user backend or by any parallel worker which was present at any point during
346  * the transaction. But when aborting, or when still in parallel mode, other
347  * parallel backends may have written WAL records at later LSNs than the value
348  * stored here. The parallel leader advances its own copy, when necessary,
349  * in WaitForParallelWorkersToFinish.
350  */
354 
355 /*
356  * RedoRecPtr is this backend's local copy of the REDO record pointer
357  * (which is almost but not quite the same as a pointer to the most recent
358  * CHECKPOINT record). We update this from the shared-memory copy,
359  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
360  * hold an insertion lock). See XLogInsertRecord for details. We are also
361  * allowed to update from XLogCtl->RedoRecPtr if we hold the info_lck;
362  * see GetRedoRecPtr. A freshly spawned backend obtains the value during
363  * InitXLOGAccess.
364  */
366 
367 /*
368  * doPageWrites is this backend's local copy of (forcePageWrites ||
369  * fullPageWrites). It is used together with RedoRecPtr to decide whether
370  * a full-page image of a page need to be taken.
371  */
372 static bool doPageWrites;
373 
374 /* Has the recovery code requested a walreceiver wakeup? */
376 
377 /*
378  * RedoStartLSN points to the checkpoint's REDO location which is specified
379  * in a backup label file, backup history file or control file. In standby
380  * mode, XLOG streaming usually starts from the position where an invalid
381  * record was found. But if we fail to read even the initial checkpoint
382  * record, we use the REDO location instead of the checkpoint location as
383  * the start position of XLOG streaming. Otherwise we would have to jump
384  * backwards to the REDO location after reading the checkpoint record,
385  * because the REDO record can precede the checkpoint record.
386  */
388 
389 /*----------
390  * Shared-memory data structures for XLOG control
391  *
392  * LogwrtRqst indicates a byte position that we need to write and/or fsync
393  * the log up to (all records before that point must be written or fsynced).
394  * LogwrtResult indicates the byte positions we have already written/fsynced.
395  * These structs are identical but are declared separately to indicate their
396  * slightly different functions.
397  *
398  * To read XLogCtl->LogwrtResult, you must hold either info_lck or
399  * WALWriteLock. To update it, you need to hold both locks. The point of
400  * this arrangement is that the value can be examined by code that already
401  * holds WALWriteLock without needing to grab info_lck as well. In addition
402  * to the shared variable, each backend has a private copy of LogwrtResult,
403  * which is updated when convenient.
404  *
405  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
406  * (protected by info_lck), but we don't need to cache any copies of it.
407  *
408  * info_lck is only held long enough to read/update the protected variables,
409  * so it's a plain spinlock. The other locks are held longer (potentially
410  * over I/O operations), so we use LWLocks for them. These locks are:
411  *
412  * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
413  * It is only held while initializing and changing the mapping. If the
414  * contents of the buffer being replaced haven't been written yet, the mapping
415  * lock is released while the write is done, and reacquired afterwards.
416  *
417  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
418  * XLogFlush).
419  *
420  * ControlFileLock: must be held to read/update control file or create
421  * new log file.
422  *
423  * CheckpointLock: must be held to do a checkpoint or restartpoint (ensures
424  * only one checkpointer at a time; currently, with all checkpoints done by
425  * the checkpointer, this is just pro forma).
426  *
427  *----------
428  */
429 
430 typedef struct XLogwrtRqst
431 {
432  XLogRecPtr Write; /* last byte + 1 to write out */
433  XLogRecPtr Flush; /* last byte + 1 to flush */
434 } XLogwrtRqst;
435 
436 typedef struct XLogwrtResult
437 {
438  XLogRecPtr Write; /* last byte + 1 written out */
439  XLogRecPtr Flush; /* last byte + 1 flushed */
440 } XLogwrtResult;
441 
442 /*
443  * Inserting to WAL is protected by a small fixed number of WAL insertion
444  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
445  * matter which one. To lock out other concurrent insertions, you must hold
446  * of them. Each WAL insertion lock consists of a lightweight lock, plus an
447  * indicator of how far the insertion has progressed (insertingAt).
448  *
449  * The insertingAt values are read when a process wants to flush WAL from
450  * the in-memory buffers to disk, to check that all the insertions to the
451  * region the process is about to write out have finished. You could simply
452  * wait for all currently in-progress insertions to finish, but the
453  * insertingAt indicator allows you to ignore insertions to later in the WAL,
454  * so that you only wait for the insertions that are modifying the buffers
455  * you're about to write out.
456  *
457  * This isn't just an optimization. If all the WAL buffers are dirty, an
458  * inserter that's holding a WAL insert lock might need to evict an old WAL
459  * buffer, which requires flushing the WAL. If it's possible for an inserter
460  * to block on another inserter unnecessarily, deadlock can arise when two
461  * inserters holding a WAL insert lock wait for each other to finish their
462  * insertion.
463  *
464  * Small WAL records that don't cross a page boundary never update the value,
465  * the WAL record is just copied to the page and the lock is released. But
466  * to avoid the deadlock-scenario explained above, the indicator is always
467  * updated before sleeping while holding an insertion lock.
468  *
469  * lastImportantAt contains the LSN of the last important WAL record inserted
470  * using a given lock. This value is used to detect if there has been
471  * important WAL activity since the last time some action, like a checkpoint,
472  * was performed - allowing to not repeat the action if not. The LSN is
473  * updated for all insertions, unless the XLOG_MARK_UNIMPORTANT flag was
474  * set. lastImportantAt is never cleared, only overwritten by the LSN of newer
475  * records. Tracking the WAL activity directly in WALInsertLock has the
476  * advantage of not needing any additional locks to update the value.
477  */
478 typedef struct
479 {
483 } WALInsertLock;
484 
485 /*
486  * All the WAL insertion locks are allocated as an array in shared memory. We
487  * force the array stride to be a power of 2, which saves a few cycles in
488  * indexing, but more importantly also ensures that individual slots don't
489  * cross cache line boundaries. (Of course, we have to also ensure that the
490  * array start address is suitably aligned.)
491  */
492 typedef union WALInsertLockPadded
493 {
497 
498 /*
499  * State of an exclusive backup, necessary to control concurrent activities
500  * across sessions when working on exclusive backups.
501  *
502  * EXCLUSIVE_BACKUP_NONE means that there is no exclusive backup actually
503  * running, to be more precise pg_start_backup() is not being executed for
504  * an exclusive backup and there is no exclusive backup in progress.
505  * EXCLUSIVE_BACKUP_STARTING means that pg_start_backup() is starting an
506  * exclusive backup.
507  * EXCLUSIVE_BACKUP_IN_PROGRESS means that pg_start_backup() has finished
508  * running and an exclusive backup is in progress. pg_stop_backup() is
509  * needed to finish it.
510  * EXCLUSIVE_BACKUP_STOPPING means that pg_stop_backup() is stopping an
511  * exclusive backup.
512  */
514 {
520 
521 /*
522  * Session status of running backup, used for sanity checks in SQL-callable
523  * functions to start and stop backups.
524  */
526 
527 /*
528  * Shared state data for WAL insertion.
529  */
530 typedef struct XLogCtlInsert
531 {
532  slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */
533 
534  /*
535  * CurrBytePos is the end of reserved WAL. The next record will be
536  * inserted at that position. PrevBytePos is the start position of the
537  * previously inserted (or rather, reserved) record - it is copied to the
538  * prev-link of the next record. These are stored as "usable byte
539  * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
540  */
541  uint64 CurrBytePos;
542  uint64 PrevBytePos;
543 
544  /*
545  * Make sure the above heavily-contended spinlock and byte positions are
546  * on their own cache line. In particular, the RedoRecPtr and full page
547  * write variables below should be on a different cache line. They are
548  * read on every WAL insertion, but updated rarely, and we don't want
549  * those reads to steal the cache line containing Curr/PrevBytePos.
550  */
552 
553  /*
554  * fullPageWrites is the master copy used by all backends to determine
555  * whether to write full-page to WAL, instead of using process-local one.
556  * This is required because, when full_page_writes is changed by SIGHUP,
557  * we must WAL-log it before it actually affects WAL-logging by backends.
558  * Checkpointer sets at startup or after SIGHUP.
559  *
560  * To read these fields, you must hold an insertion lock. To modify them,
561  * you must hold ALL the locks.
562  */
563  XLogRecPtr RedoRecPtr; /* current redo point for insertions */
564  bool forcePageWrites; /* forcing full-page writes for PITR? */
566 
567  /*
568  * exclusiveBackupState indicates the state of an exclusive backup (see
569  * comments of ExclusiveBackupState for more details). nonExclusiveBackups
570  * is a counter indicating the number of streaming base backups currently
571  * in progress. forcePageWrites is set to true when either of these is
572  * non-zero. lastBackupStart is the latest checkpoint redo location used
573  * as a starting point for an online backup.
574  */
578 
579  /*
580  * WAL insertion locks.
581  */
583 } XLogCtlInsert;
584 
585 /*
586  * Total shared-memory state for XLOG.
587  */
588 typedef struct XLogCtlData
589 {
591 
592  /* Protected by info_lck: */
594  XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */
595  FullTransactionId ckptFullXid; /* nextFullXid of latest checkpoint */
596  XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */
597  XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */
598 
599  XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */
600 
601  /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
604 
605  /* Time and LSN of last xlog segment switch. Protected by WALWriteLock. */
608 
609  /*
610  * Protected by info_lck and WALWriteLock (you must hold either lock to
611  * read it, but both to update)
612  */
614 
615  /*
616  * Latest initialized page in the cache (last byte position + 1).
617  *
618  * To change the identity of a buffer (and InitializedUpTo), you need to
619  * hold WALBufMappingLock. To change the identity of a buffer that's
620  * still dirty, the old page needs to be written out first, and for that
621  * you need WALWriteLock, and you need to ensure that there are no
622  * in-progress insertions to the page by calling
623  * WaitXLogInsertionsToFinish().
624  */
626 
627  /*
628  * These values do not change after startup, although the pointed-to pages
629  * and xlblocks values certainly do. xlblocks values are protected by
630  * WALBufMappingLock.
631  */
632  char *pages; /* buffers for unwritten XLOG pages */
633  XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
634  int XLogCacheBlck; /* highest allocated xlog buffer index */
635 
636  /*
637  * Shared copy of ThisTimeLineID. Does not change after end-of-recovery.
638  * If we created a new timeline when the system was started up,
639  * PrevTimeLineID is the old timeline's ID that we forked off from.
640  * Otherwise it's equal to ThisTimeLineID.
641  */
644 
645  /*
646  * SharedRecoveryInProgress indicates if we're still in crash or archive
647  * recovery. Protected by info_lck.
648  */
650 
651  /*
652  * SharedHotStandbyActive indicates if we're still in crash or archive
653  * recovery. Protected by info_lck.
654  */
656 
657  /*
658  * WalWriterSleeping indicates whether the WAL writer is currently in
659  * low-power mode (and hence should be nudged if an async commit occurs).
660  * Protected by info_lck.
661  */
663 
664  /*
665  * recoveryWakeupLatch is used to wake up the startup process to continue
666  * WAL replay, if it is waiting for WAL to arrive or failover trigger file
667  * to appear.
668  */
670 
671  /*
672  * During recovery, we keep a copy of the latest checkpoint record here.
673  * lastCheckPointRecPtr points to start of checkpoint record and
674  * lastCheckPointEndPtr points to end+1 of checkpoint record. Used by the
675  * checkpointer when it wants to create a restartpoint.
676  *
677  * Protected by info_lck.
678  */
682 
683  /*
684  * lastReplayedEndRecPtr points to end+1 of the last record successfully
685  * replayed. When we're currently replaying a record, ie. in a redo
686  * function, replayEndRecPtr points to the end+1 of the record being
687  * replayed, otherwise it's equal to lastReplayedEndRecPtr.
688  */
693  /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
695 
696  /*
697  * timestamp of when we started replaying the current chunk of WAL data,
698  * only relevant for replication or archive recovery
699  */
701  /* Are we requested to pause recovery? */
703 
704  /*
705  * lastFpwDisableRecPtr points to the start of the last replayed
706  * XLOG_FPW_CHANGE record that instructs full_page_writes is disabled.
707  */
709 
710  slock_t info_lck; /* locks shared variables shown above */
711 } XLogCtlData;
712 
713 static XLogCtlData *XLogCtl = NULL;
714 
715 /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
717 
718 /*
719  * We maintain an image of pg_control in shared memory.
720  */
722 
723 /*
724  * Calculate the amount of space left on the page after 'endptr'. Beware
725  * multiple evaluation!
726  */
727 #define INSERT_FREESPACE(endptr) \
728  (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
729 
730 /* Macro to advance to next buffer index. */
731 #define NextBufIdx(idx) \
732  (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
733 
734 /*
735  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
736  * would hold if it was in cache, the page containing 'recptr'.
737  */
738 #define XLogRecPtrToBufIdx(recptr) \
739  (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
740 
741 /*
742  * These are the number of bytes in a WAL page usable for WAL data.
743  */
744 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
745 
746 /* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
747 #define ConvertToXSegs(x, segsize) \
748  (x / ((segsize) / (1024 * 1024)))
749 
750 /* The number of bytes in a WAL segment usable for WAL data. */
752 
753 /*
754  * Private, possibly out-of-date copy of shared LogwrtResult.
755  * See discussion above.
756  */
757 static XLogwrtResult LogwrtResult = {0, 0};
758 
759 /*
760  * Codes indicating where we got a WAL file from during recovery, or where
761  * to attempt to get one.
762  */
763 typedef enum
764 {
765  XLOG_FROM_ANY = 0, /* request to read WAL from any source */
766  XLOG_FROM_ARCHIVE, /* restored using restore_command */
767  XLOG_FROM_PG_WAL, /* existing file in pg_wal */
768  XLOG_FROM_STREAM /* streamed from master */
769 } XLogSource;
770 
771 /* human-readable names for XLogSources, for debugging output */
772 static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
773 
774 /*
775  * openLogFile is -1 or a kernel FD for an open log file segment.
776  * openLogSegNo identifies the segment. These variables are only used to
777  * write the XLOG, and so will normally refer to the active segment.
778  */
779 static int openLogFile = -1;
781 
782 /*
783  * These variables are used similarly to the ones above, but for reading
784  * the XLOG. Note, however, that readOff generally represents the offset
785  * of the page just read, not the seek position of the FD itself, which
786  * will be just past that page. readLen indicates how much of the current
787  * page has been read into readBuf, and readSource indicates where we got
788  * the currently open file from.
789  */
790 static int readFile = -1;
791 static XLogSegNo readSegNo = 0;
792 static uint32 readOff = 0;
793 static uint32 readLen = 0;
794 static XLogSource readSource = 0; /* XLOG_FROM_* code */
795 
796 /*
797  * Keeps track of which source we're currently reading from. This is
798  * different from readSource in that this is always set, even when we don't
799  * currently have a WAL file open. If lastSourceFailed is set, our last
800  * attempt to read from currentSource failed, and we should try another source
801  * next.
802  */
803 static XLogSource currentSource = 0; /* XLOG_FROM_* code */
804 static bool lastSourceFailed = false;
805 
806 typedef struct XLogPageReadPrivate
807 {
808  int emode;
809  bool fetching_ckpt; /* are we fetching a checkpoint record? */
812 
813 /*
814  * These variables track when we last obtained some WAL data to process,
815  * and where we got it from. (XLogReceiptSource is initially the same as
816  * readSource, but readSource gets reset to zero when we don't have data
817  * to process right now. It is also different from currentSource, which
818  * also changes when we try to read from a source and fail, while
819  * XLogReceiptSource tracks where we last successfully read some WAL.)
820  */
822 static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
823 
824 /* State information for XLOG reading */
825 static XLogRecPtr ReadRecPtr; /* start of last record read */
826 static XLogRecPtr EndRecPtr; /* end+1 of last record read */
827 
828 /*
829  * Local copies of equivalent fields in the control file. When running
830  * crash recovery, minRecoveryPoint is set to InvalidXLogRecPtr as we
831  * expect to replay all the WAL available, and updateMinRecoveryPoint is
832  * switched to false to prevent any updates while replaying records.
833  * Those values are kept consistent as long as crash recovery runs.
834  */
837 static bool updateMinRecoveryPoint = true;
838 
839 /*
840  * Have we reached a consistent database state? In crash recovery, we have
841  * to replay all the WAL, so reachedConsistency is never set. During archive
842  * recovery, the database is consistent once minRecoveryPoint is reached.
843  */
844 bool reachedConsistency = false;
845 
846 static bool InRedo = false;
847 
848 /* Have we launched bgwriter during recovery? */
849 static bool bgwriterLaunched = false;
850 
851 /* For WALInsertLockAcquire/Release functions */
852 static int MyLockNo = 0;
853 static bool holdingAllLocks = false;
854 
855 #ifdef WAL_DEBUG
856 static MemoryContext walDebugCxt = NULL;
857 #endif
858 
859 static void readRecoverySignalFile(void);
860 static void validateRecoveryParameters(void);
861 static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
862 static bool recoveryStopsBefore(XLogReaderState *record);
863 static bool recoveryStopsAfter(XLogReaderState *record);
864 static void recoveryPausesHere(void);
865 static bool recoveryApplyDelay(XLogReaderState *record);
866 static void SetLatestXTime(TimestampTz xtime);
867 static void SetCurrentChunkStartTime(TimestampTz xtime);
868 static void CheckRequiredParameterValues(void);
869 static void XLogReportParameters(void);
870 static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
871  TimeLineID prevTLI);
872 static void LocalSetXLogInsertAllowed(void);
873 static void CreateEndOfRecoveryRecord(void);
874 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
875 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
877 
878 static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
879 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
880 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
881 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
882  bool find_free, XLogSegNo max_segno,
883  bool use_lock);
884 static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
885  int source, bool notfoundOk);
886 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
887 static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
888  int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
889 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
890  bool fetching_ckpt, XLogRecPtr tliRecPtr);
891 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
892 static void XLogFileClose(void);
893 static void PreallocXlogFiles(XLogRecPtr endptr);
894 static void RemoveTempXlogFiles(void);
895 static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr RedoRecPtr, XLogRecPtr endptr);
896 static void RemoveXlogFile(const char *segname, XLogRecPtr RedoRecPtr, XLogRecPtr endptr);
897 static void UpdateLastRemovedPtr(char *filename);
898 static void ValidateXLOGDirectoryStructure(void);
899 static void CleanupBackupHistory(void);
900 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
901 static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
902  int emode, bool fetching_ckpt);
903 static void CheckRecoveryConsistency(void);
905  XLogRecPtr RecPtr, int whichChkpt, bool report);
906 static bool rescanLatestTimeLine(void);
907 static void WriteControlFile(void);
908 static void ReadControlFile(void);
909 static char *str_time(pg_time_t tnow);
910 static bool CheckForStandbyTrigger(void);
911 
912 #ifdef WAL_DEBUG
913 static void xlog_outrec(StringInfo buf, XLogReaderState *record);
914 #endif
915 static void xlog_outdesc(StringInfo buf, XLogReaderState *record);
916 static void pg_start_backup_callback(int code, Datum arg);
917 static void pg_stop_backup_callback(int code, Datum arg);
918 static bool read_backup_label(XLogRecPtr *checkPointLoc,
919  bool *backupEndRequired, bool *backupFromStandby);
920 static bool read_tablespace_map(List **tablespaces);
921 
922 static void rm_redo_error_callback(void *arg);
923 static int get_sync_bit(int method);
924 
925 static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
926  XLogRecData *rdata,
927  XLogRecPtr StartPos, XLogRecPtr EndPos);
928 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
929  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
930 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
931  XLogRecPtr *PrevPtr);
933 static char *GetXLogBuffer(XLogRecPtr ptr);
934 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
935 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
936 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
937 static void checkXLogConsistency(XLogReaderState *record);
938 
939 static void WALInsertLockAcquire(void);
940 static void WALInsertLockAcquireExclusive(void);
941 static void WALInsertLockRelease(void);
942 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
943 
944 /*
945  * Insert an XLOG record represented by an already-constructed chain of data
946  * chunks. This is a low-level routine; to construct the WAL record header
947  * and data, use the higher-level routines in xloginsert.c.
948  *
949  * If 'fpw_lsn' is valid, it is the oldest LSN among the pages that this
950  * WAL record applies to, that were not included in the record as full page
951  * images. If fpw_lsn <= RedoRecPtr, the function does not perform the
952  * insertion and returns InvalidXLogRecPtr. The caller can then recalculate
953  * which pages need a full-page image, and retry. If fpw_lsn is invalid, the
954  * record is always inserted.
955  *
956  * 'flags' gives more in-depth control on the record being inserted. See
957  * XLogSetRecordFlags() for details.
958  *
959  * The first XLogRecData in the chain must be for the record header, and its
960  * data must be MAXALIGNed. XLogInsertRecord fills in the xl_prev and
961  * xl_crc fields in the header, the rest of the header must already be filled
962  * by the caller.
963  *
964  * Returns XLOG pointer to end of record (beginning of next record).
965  * This can be used as LSN for data pages affected by the logged action.
966  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
967  * before the data page can be written out. This implements the basic
968  * WAL rule "write the log before the data".)
969  */
972  XLogRecPtr fpw_lsn,
973  uint8 flags)
974 {
975  XLogCtlInsert *Insert = &XLogCtl->Insert;
976  pg_crc32c rdata_crc;
977  bool inserted;
978  XLogRecord *rechdr = (XLogRecord *) rdata->data;
979  uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
980  bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
981  info == XLOG_SWITCH);
982  XLogRecPtr StartPos;
983  XLogRecPtr EndPos;
984  bool prevDoPageWrites = doPageWrites;
985 
986  /* we assume that all of the record header is in the first chunk */
987  Assert(rdata->len >= SizeOfXLogRecord);
988 
989  /* cross-check on whether we should be here or not */
990  if (!XLogInsertAllowed())
991  elog(ERROR, "cannot make new WAL entries during recovery");
992 
993  /*----------
994  *
995  * We have now done all the preparatory work we can without holding a
996  * lock or modifying shared state. From here on, inserting the new WAL
997  * record to the shared WAL buffer cache is a two-step process:
998  *
999  * 1. Reserve the right amount of space from the WAL. The current head of
1000  * reserved space is kept in Insert->CurrBytePos, and is protected by
1001  * insertpos_lck.
1002  *
1003  * 2. Copy the record to the reserved WAL space. This involves finding the
1004  * correct WAL buffer containing the reserved space, and copying the
1005  * record in place. This can be done concurrently in multiple processes.
1006  *
1007  * To keep track of which insertions are still in-progress, each concurrent
1008  * inserter acquires an insertion lock. In addition to just indicating that
1009  * an insertion is in progress, the lock tells others how far the inserter
1010  * has progressed. There is a small fixed number of insertion locks,
1011  * determined by NUM_XLOGINSERT_LOCKS. When an inserter crosses a page
1012  * boundary, it updates the value stored in the lock to the how far it has
1013  * inserted, to allow the previous buffer to be flushed.
1014  *
1015  * Holding onto an insertion lock also protects RedoRecPtr and
1016  * fullPageWrites from changing until the insertion is finished.
1017  *
1018  * Step 2 can usually be done completely in parallel. If the required WAL
1019  * page is not initialized yet, you have to grab WALBufMappingLock to
1020  * initialize it, but the WAL writer tries to do that ahead of insertions
1021  * to avoid that from happening in the critical path.
1022  *
1023  *----------
1024  */
1026  if (isLogSwitch)
1028  else
1030 
1031  /*
1032  * Check to see if my copy of RedoRecPtr is out of date. If so, may have
1033  * to go back and have the caller recompute everything. This can only
1034  * happen just after a checkpoint, so it's better to be slow in this case
1035  * and fast otherwise.
1036  *
1037  * Also check to see if fullPageWrites or forcePageWrites was just turned
1038  * on; if we weren't already doing full-page writes then go back and
1039  * recompute.
1040  *
1041  * If we aren't doing full-page writes then RedoRecPtr doesn't actually
1042  * affect the contents of the XLOG record, so we'll update our local copy
1043  * but not force a recomputation. (If doPageWrites was just turned off,
1044  * we could recompute the record without full pages, but we choose not to
1045  * bother.)
1046  */
1047  if (RedoRecPtr != Insert->RedoRecPtr)
1048  {
1049  Assert(RedoRecPtr < Insert->RedoRecPtr);
1050  RedoRecPtr = Insert->RedoRecPtr;
1051  }
1052  doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
1053 
1054  if (doPageWrites &&
1055  (!prevDoPageWrites ||
1056  (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
1057  {
1058  /*
1059  * Oops, some buffer now needs to be backed up that the caller didn't
1060  * back up. Start over.
1061  */
1063  END_CRIT_SECTION();
1064  return InvalidXLogRecPtr;
1065  }
1066 
1067  /*
1068  * Reserve space for the record in the WAL. This also sets the xl_prev
1069  * pointer.
1070  */
1071  if (isLogSwitch)
1072  inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
1073  else
1074  {
1075  ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
1076  &rechdr->xl_prev);
1077  inserted = true;
1078  }
1079 
1080  if (inserted)
1081  {
1082  /*
1083  * Now that xl_prev has been filled in, calculate CRC of the record
1084  * header.
1085  */
1086  rdata_crc = rechdr->xl_crc;
1087  COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
1088  FIN_CRC32C(rdata_crc);
1089  rechdr->xl_crc = rdata_crc;
1090 
1091  /*
1092  * All the record data, including the header, is now ready to be
1093  * inserted. Copy the record in the space reserved.
1094  */
1095  CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
1096  StartPos, EndPos);
1097 
1098  /*
1099  * Unless record is flagged as not important, update LSN of last
1100  * important record in the current slot. When holding all locks, just
1101  * update the first one.
1102  */
1103  if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
1104  {
1105  int lockno = holdingAllLocks ? 0 : MyLockNo;
1106 
1107  WALInsertLocks[lockno].l.lastImportantAt = StartPos;
1108  }
1109  }
1110  else
1111  {
1112  /*
1113  * This was an xlog-switch record, but the current insert location was
1114  * already exactly at the beginning of a segment, so there was no need
1115  * to do anything.
1116  */
1117  }
1118 
1119  /*
1120  * Done! Let others know that we're finished.
1121  */
1123 
1125 
1126  END_CRIT_SECTION();
1127 
1128  /*
1129  * Update shared LogwrtRqst.Write, if we crossed page boundary.
1130  */
1131  if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
1132  {
1133  SpinLockAcquire(&XLogCtl->info_lck);
1134  /* advance global request to include new block(s) */
1135  if (XLogCtl->LogwrtRqst.Write < EndPos)
1136  XLogCtl->LogwrtRqst.Write = EndPos;
1137  /* update local result copy while I have the chance */
1138  LogwrtResult = XLogCtl->LogwrtResult;
1139  SpinLockRelease(&XLogCtl->info_lck);
1140  }
1141 
1142  /*
1143  * If this was an XLOG_SWITCH record, flush the record and the empty
1144  * padding space that fills the rest of the segment, and perform
1145  * end-of-segment actions (eg, notifying archiver).
1146  */
1147  if (isLogSwitch)
1148  {
1149  TRACE_POSTGRESQL_WAL_SWITCH();
1150  XLogFlush(EndPos);
1151 
1152  /*
1153  * Even though we reserved the rest of the segment for us, which is
1154  * reflected in EndPos, we return a pointer to just the end of the
1155  * xlog-switch record.
1156  */
1157  if (inserted)
1158  {
1159  EndPos = StartPos + SizeOfXLogRecord;
1160  if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
1161  {
1162  uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
1163 
1164  if (offset == EndPos % XLOG_BLCKSZ)
1165  EndPos += SizeOfXLogLongPHD;
1166  else
1167  EndPos += SizeOfXLogShortPHD;
1168  }
1169  }
1170  }
1171 
1172 #ifdef WAL_DEBUG
1173  if (XLOG_DEBUG)
1174  {
1175  static XLogReaderState *debug_reader = NULL;
1177  StringInfoData recordBuf;
1178  char *errormsg = NULL;
1179  MemoryContext oldCxt;
1180 
1181  oldCxt = MemoryContextSwitchTo(walDebugCxt);
1182 
1183  initStringInfo(&buf);
1184  appendStringInfo(&buf, "INSERT @ %X/%X: ",
1185  (uint32) (EndPos >> 32), (uint32) EndPos);
1186 
1187  /*
1188  * We have to piece together the WAL record data from the XLogRecData
1189  * entries, so that we can pass it to the rm_desc function as one
1190  * contiguous chunk.
1191  */
1192  initStringInfo(&recordBuf);
1193  for (; rdata != NULL; rdata = rdata->next)
1194  appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
1195 
1196  if (!debug_reader)
1197  debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
1198  NULL, NULL);
1199 
1200  if (!debug_reader)
1201  {
1202  appendStringInfoString(&buf, "error decoding record: out of memory");
1203  }
1204  else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
1205  &errormsg))
1206  {
1207  appendStringInfo(&buf, "error decoding record: %s",
1208  errormsg ? errormsg : "no error message");
1209  }
1210  else
1211  {
1212  appendStringInfoString(&buf, " - ");
1213  xlog_outdesc(&buf, debug_reader);
1214  }
1215  elog(LOG, "%s", buf.data);
1216 
1217  pfree(buf.data);
1218  pfree(recordBuf.data);
1219  MemoryContextSwitchTo(oldCxt);
1220  }
1221 #endif
1222 
1223  /*
1224  * Update our global variables
1225  */
1226  ProcLastRecPtr = StartPos;
1227  XactLastRecEnd = EndPos;
1228 
1229  return EndPos;
1230 }
1231 
1232 /*
1233  * Reserves the right amount of space for a record of given size from the WAL.
1234  * *StartPos is set to the beginning of the reserved section, *EndPos to
1235  * its end+1. *PrevPtr is set to the beginning of the previous record; it is
1236  * used to set the xl_prev of this record.
1237  *
1238  * This is the performance critical part of XLogInsert that must be serialized
1239  * across backends. The rest can happen mostly in parallel. Try to keep this
1240  * section as short as possible, insertpos_lck can be heavily contended on a
1241  * busy system.
1242  *
1243  * NB: The space calculation here must match the code in CopyXLogRecordToWAL,
1244  * where we actually copy the record to the reserved space.
1245  */
1246 static void
1247 ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
1248  XLogRecPtr *PrevPtr)
1249 {
1250  XLogCtlInsert *Insert = &XLogCtl->Insert;
1251  uint64 startbytepos;
1252  uint64 endbytepos;
1253  uint64 prevbytepos;
1254 
1255  size = MAXALIGN(size);
1256 
1257  /* All (non xlog-switch) records should contain data. */
1258  Assert(size > SizeOfXLogRecord);
1259 
1260  /*
1261  * The duration the spinlock needs to be held is minimized by minimizing
1262  * the calculations that have to be done while holding the lock. The
1263  * current tip of reserved WAL is kept in CurrBytePos, as a byte position
1264  * that only counts "usable" bytes in WAL, that is, it excludes all WAL
1265  * page headers. The mapping between "usable" byte positions and physical
1266  * positions (XLogRecPtrs) can be done outside the locked region, and
1267  * because the usable byte position doesn't include any headers, reserving
1268  * X bytes from WAL is almost as simple as "CurrBytePos += X".
1269  */
1270  SpinLockAcquire(&Insert->insertpos_lck);
1271 
1272  startbytepos = Insert->CurrBytePos;
1273  endbytepos = startbytepos + size;
1274  prevbytepos = Insert->PrevBytePos;
1275  Insert->CurrBytePos = endbytepos;
1276  Insert->PrevBytePos = startbytepos;
1277 
1278  SpinLockRelease(&Insert->insertpos_lck);
1279 
1280  *StartPos = XLogBytePosToRecPtr(startbytepos);
1281  *EndPos = XLogBytePosToEndRecPtr(endbytepos);
1282  *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
1283 
1284  /*
1285  * Check that the conversions between "usable byte positions" and
1286  * XLogRecPtrs work consistently in both directions.
1287  */
1288  Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
1289  Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
1290  Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
1291 }
1292 
1293 /*
1294  * Like ReserveXLogInsertLocation(), but for an xlog-switch record.
1295  *
1296  * A log-switch record is handled slightly differently. The rest of the
1297  * segment will be reserved for this insertion, as indicated by the returned
1298  * *EndPos value. However, if we are already at the beginning of the current
1299  * segment, *StartPos and *EndPos are set to the current location without
1300  * reserving any space, and the function returns false.
1301 */
1302 static bool
1303 ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
1304 {
1305  XLogCtlInsert *Insert = &XLogCtl->Insert;
1306  uint64 startbytepos;
1307  uint64 endbytepos;
1308  uint64 prevbytepos;
1310  XLogRecPtr ptr;
1311  uint32 segleft;
1312 
1313  /*
1314  * These calculations are a bit heavy-weight to be done while holding a
1315  * spinlock, but since we're holding all the WAL insertion locks, there
1316  * are no other inserters competing for it. GetXLogInsertRecPtr() does
1317  * compete for it, but that's not called very frequently.
1318  */
1319  SpinLockAcquire(&Insert->insertpos_lck);
1320 
1321  startbytepos = Insert->CurrBytePos;
1322 
1323  ptr = XLogBytePosToEndRecPtr(startbytepos);
1324  if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
1325  {
1326  SpinLockRelease(&Insert->insertpos_lck);
1327  *EndPos = *StartPos = ptr;
1328  return false;
1329  }
1330 
1331  endbytepos = startbytepos + size;
1332  prevbytepos = Insert->PrevBytePos;
1333 
1334  *StartPos = XLogBytePosToRecPtr(startbytepos);
1335  *EndPos = XLogBytePosToEndRecPtr(endbytepos);
1336 
1337  segleft = wal_segment_size - XLogSegmentOffset(*EndPos, wal_segment_size);
1338  if (segleft != wal_segment_size)
1339  {
1340  /* consume the rest of the segment */
1341  *EndPos += segleft;
1342  endbytepos = XLogRecPtrToBytePos(*EndPos);
1343  }
1344  Insert->CurrBytePos = endbytepos;
1345  Insert->PrevBytePos = startbytepos;
1346 
1347  SpinLockRelease(&Insert->insertpos_lck);
1348 
1349  *PrevPtr = XLogBytePosToRecPtr(prevbytepos);
1350 
1351  Assert(XLogSegmentOffset(*EndPos, wal_segment_size) == 0);
1352  Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
1353  Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
1354  Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
1355 
1356  return true;
1357 }
1358 
1359 /*
1360  * Checks whether the current buffer page and backup page stored in the
1361  * WAL record are consistent or not. Before comparing the two pages, a
1362  * masking can be applied to the pages to ignore certain areas like hint bits,
1363  * unused space between pd_lower and pd_upper among other things. This
1364  * function should be called once WAL replay has been completed for a
1365  * given record.
1366  */
1367 static void
1369 {
1370  RmgrId rmid = XLogRecGetRmid(record);
1371  RelFileNode rnode;
1372  ForkNumber forknum;
1373  BlockNumber blkno;
1374  int block_id;
1375 
1376  /* Records with no backup blocks have no need for consistency checks. */
1377  if (!XLogRecHasAnyBlockRefs(record))
1378  return;
1379 
1380  Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
1381 
1382  for (block_id = 0; block_id <= record->max_block_id; block_id++)
1383  {
1384  Buffer buf;
1385  Page page;
1386 
1387  if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
1388  {
1389  /*
1390  * WAL record doesn't contain a block reference with the given id.
1391  * Do nothing.
1392  */
1393  continue;
1394  }
1395 
1396  Assert(XLogRecHasBlockImage(record, block_id));
1397 
1398  if (XLogRecBlockImageApply(record, block_id))
1399  {
1400  /*
1401  * WAL record has already applied the page, so bypass the
1402  * consistency check as that would result in comparing the full
1403  * page stored in the record with itself.
1404  */
1405  continue;
1406  }
1407 
1408  /*
1409  * Read the contents from the current buffer and store it in a
1410  * temporary page.
1411  */
1412  buf = XLogReadBufferExtended(rnode, forknum, blkno,
1414  if (!BufferIsValid(buf))
1415  continue;
1416 
1418  page = BufferGetPage(buf);
1419 
1420  /*
1421  * Take a copy of the local page where WAL has been applied to have a
1422  * comparison base before masking it...
1423  */
1424  memcpy(replay_image_masked, page, BLCKSZ);
1425 
1426  /* No need for this page anymore now that a copy is in. */
1427  UnlockReleaseBuffer(buf);
1428 
1429  /*
1430  * If the block LSN is already ahead of this WAL record, we can't
1431  * expect contents to match. This can happen if recovery is
1432  * restarted.
1433  */
1434  if (PageGetLSN(replay_image_masked) > record->EndRecPtr)
1435  continue;
1436 
1437  /*
1438  * Read the contents from the backup copy, stored in WAL record and
1439  * store it in a temporary page. There is no need to allocate a new
1440  * page here, a local buffer is fine to hold its contents and a mask
1441  * can be directly applied on it.
1442  */
1443  if (!RestoreBlockImage(record, block_id, master_image_masked))
1444  elog(ERROR, "failed to restore block image");
1445 
1446  /*
1447  * If masking function is defined, mask both the master and replay
1448  * images
1449  */
1450  if (RmgrTable[rmid].rm_mask != NULL)
1451  {
1452  RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
1453  RmgrTable[rmid].rm_mask(master_image_masked, blkno);
1454  }
1455 
1456  /* Time to compare the master and replay images. */
1457  if (memcmp(replay_image_masked, master_image_masked, BLCKSZ) != 0)
1458  {
1459  elog(FATAL,
1460  "inconsistent page found, rel %u/%u/%u, forknum %u, blkno %u",
1461  rnode.spcNode, rnode.dbNode, rnode.relNode,
1462  forknum, blkno);
1463  }
1464  }
1465 }
1466 
1467 /*
1468  * Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved
1469  * area in the WAL.
1470  */
1471 static void
1472 CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
1473  XLogRecPtr StartPos, XLogRecPtr EndPos)
1474 {
1475  char *currpos;
1476  int freespace;
1477  int written;
1478  XLogRecPtr CurrPos;
1479  XLogPageHeader pagehdr;
1480 
1481  /*
1482  * Get a pointer to the right place in the right WAL buffer to start
1483  * inserting to.
1484  */
1485  CurrPos = StartPos;
1486  currpos = GetXLogBuffer(CurrPos);
1487  freespace = INSERT_FREESPACE(CurrPos);
1488 
1489  /*
1490  * there should be enough space for at least the first field (xl_tot_len)
1491  * on this page.
1492  */
1493  Assert(freespace >= sizeof(uint32));
1494 
1495  /* Copy record data */
1496  written = 0;
1497  while (rdata != NULL)
1498  {
1499  char *rdata_data = rdata->data;
1500  int rdata_len = rdata->len;
1501 
1502  while (rdata_len > freespace)
1503  {
1504  /*
1505  * Write what fits on this page, and continue on the next page.
1506  */
1507  Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
1508  memcpy(currpos, rdata_data, freespace);
1509  rdata_data += freespace;
1510  rdata_len -= freespace;
1511  written += freespace;
1512  CurrPos += freespace;
1513 
1514  /*
1515  * Get pointer to beginning of next page, and set the xlp_rem_len
1516  * in the page header. Set XLP_FIRST_IS_CONTRECORD.
1517  *
1518  * It's safe to set the contrecord flag and xlp_rem_len without a
1519  * lock on the page. All the other flags were already set when the
1520  * page was initialized, in AdvanceXLInsertBuffer, and we're the
1521  * only backend that needs to set the contrecord flag.
1522  */
1523  currpos = GetXLogBuffer(CurrPos);
1524  pagehdr = (XLogPageHeader) currpos;
1525  pagehdr->xlp_rem_len = write_len - written;
1526  pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
1527 
1528  /* skip over the page header */
1529  if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
1530  {
1531  CurrPos += SizeOfXLogLongPHD;
1532  currpos += SizeOfXLogLongPHD;
1533  }
1534  else
1535  {
1536  CurrPos += SizeOfXLogShortPHD;
1537  currpos += SizeOfXLogShortPHD;
1538  }
1539  freespace = INSERT_FREESPACE(CurrPos);
1540  }
1541 
1542  Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
1543  memcpy(currpos, rdata_data, rdata_len);
1544  currpos += rdata_len;
1545  CurrPos += rdata_len;
1546  freespace -= rdata_len;
1547  written += rdata_len;
1548 
1549  rdata = rdata->next;
1550  }
1551  Assert(written == write_len);
1552 
1553  /*
1554  * If this was an xlog-switch, it's not enough to write the switch record,
1555  * we also have to consume all the remaining space in the WAL segment. We
1556  * have already reserved that space, but we need to actually fill it.
1557  */
1558  if (isLogSwitch && XLogSegmentOffset(CurrPos, wal_segment_size) != 0)
1559  {
1560  /* An xlog-switch record doesn't contain any data besides the header */
1561  Assert(write_len == SizeOfXLogRecord);
1562 
1563  /* Assert that we did reserve the right amount of space */
1564  Assert(XLogSegmentOffset(EndPos, wal_segment_size) == 0);
1565 
1566  /* Use up all the remaining space on the current page */
1567  CurrPos += freespace;
1568 
1569  /*
1570  * Cause all remaining pages in the segment to be flushed, leaving the
1571  * XLog position where it should be, at the start of the next segment.
1572  * We do this one page at a time, to make sure we don't deadlock
1573  * against ourselves if wal_buffers < wal_segment_size.
1574  */
1575  while (CurrPos < EndPos)
1576  {
1577  /*
1578  * The minimal action to flush the page would be to call
1579  * WALInsertLockUpdateInsertingAt(CurrPos) followed by
1580  * AdvanceXLInsertBuffer(...). The page would be left initialized
1581  * mostly to zeros, except for the page header (always the short
1582  * variant, as this is never a segment's first page).
1583  *
1584  * The large vistas of zeros are good for compressibility, but the
1585  * headers interrupting them every XLOG_BLCKSZ (with values that
1586  * differ from page to page) are not. The effect varies with
1587  * compression tool, but bzip2 for instance compresses about an
1588  * order of magnitude worse if those headers are left in place.
1589  *
1590  * Rather than complicating AdvanceXLInsertBuffer itself (which is
1591  * called in heavily-loaded circumstances as well as this lightly-
1592  * loaded one) with variant behavior, we just use GetXLogBuffer
1593  * (which itself calls the two methods we need) to get the pointer
1594  * and zero most of the page. Then we just zero the page header.
1595  */
1596  currpos = GetXLogBuffer(CurrPos);
1597  MemSet(currpos, 0, SizeOfXLogShortPHD);
1598 
1599  CurrPos += XLOG_BLCKSZ;
1600  }
1601  }
1602  else
1603  {
1604  /* Align the end position, so that the next record starts aligned */
1605  CurrPos = MAXALIGN64(CurrPos);
1606  }
1607 
1608  if (CurrPos != EndPos)
1609  elog(PANIC, "space reserved for WAL record does not match what was written");
1610 }
1611 
1612 /*
1613  * Acquire a WAL insertion lock, for inserting to WAL.
1614  */
1615 static void
1617 {
1618  bool immed;
1619 
1620  /*
1621  * It doesn't matter which of the WAL insertion locks we acquire, so try
1622  * the one we used last time. If the system isn't particularly busy, it's
1623  * a good bet that it's still available, and it's good to have some
1624  * affinity to a particular lock so that you don't unnecessarily bounce
1625  * cache lines between processes when there's no contention.
1626  *
1627  * If this is the first time through in this backend, pick a lock
1628  * (semi-)randomly. This allows the locks to be used evenly if you have a
1629  * lot of very short connections.
1630  */
1631  static int lockToTry = -1;
1632 
1633  if (lockToTry == -1)
1634  lockToTry = MyProc->pgprocno % NUM_XLOGINSERT_LOCKS;
1635  MyLockNo = lockToTry;
1636 
1637  /*
1638  * The insertingAt value is initially set to 0, as we don't know our
1639  * insert location yet.
1640  */
1641  immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
1642  if (!immed)
1643  {
1644  /*
1645  * If we couldn't get the lock immediately, try another lock next
1646  * time. On a system with more insertion locks than concurrent
1647  * inserters, this causes all the inserters to eventually migrate to a
1648  * lock that no-one else is using. On a system with more inserters
1649  * than locks, it still helps to distribute the inserters evenly
1650  * across the locks.
1651  */
1652  lockToTry = (lockToTry + 1) % NUM_XLOGINSERT_LOCKS;
1653  }
1654 }
1655 
1656 /*
1657  * Acquire all WAL insertion locks, to prevent other backends from inserting
1658  * to WAL.
1659  */
1660 static void
1662 {
1663  int i;
1664 
1665  /*
1666  * When holding all the locks, all but the last lock's insertingAt
1667  * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
1668  * XLogRecPtr value, to make sure that no-one blocks waiting on those.
1669  */
1670  for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)
1671  {
1672  LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
1673  LWLockUpdateVar(&WALInsertLocks[i].l.lock,
1674  &WALInsertLocks[i].l.insertingAt,
1675  PG_UINT64_MAX);
1676  }
1677  /* Variable value reset to 0 at release */
1678  LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
1679 
1680  holdingAllLocks = true;
1681 }
1682 
1683 /*
1684  * Release our insertion lock (or locks, if we're holding them all).
1685  *
1686  * NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
1687  * next time the lock is acquired.
1688  */
1689 static void
1691 {
1692  if (holdingAllLocks)
1693  {
1694  int i;
1695 
1696  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
1697  LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
1698  &WALInsertLocks[i].l.insertingAt,
1699  0);
1700 
1701  holdingAllLocks = false;
1702  }
1703  else
1704  {
1705  LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
1706  &WALInsertLocks[MyLockNo].l.insertingAt,
1707  0);
1708  }
1709 }
1710 
1711 /*
1712  * Update our insertingAt value, to let others know that we've finished
1713  * inserting up to that point.
1714  */
1715 static void
1717 {
1718  if (holdingAllLocks)
1719  {
1720  /*
1721  * We use the last lock to mark our actual position, see comments in
1722  * WALInsertLockAcquireExclusive.
1723  */
1724  LWLockUpdateVar(&WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.lock,
1725  &WALInsertLocks[NUM_XLOGINSERT_LOCKS - 1].l.insertingAt,
1726  insertingAt);
1727  }
1728  else
1729  LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock,
1730  &WALInsertLocks[MyLockNo].l.insertingAt,
1731  insertingAt);
1732 }
1733 
1734 /*
1735  * Wait for any WAL insertions < upto to finish.
1736  *
1737  * Returns the location of the oldest insertion that is still in-progress.
1738  * Any WAL prior to that point has been fully copied into WAL buffers, and
1739  * can be flushed out to disk. Because this waits for any insertions older
1740  * than 'upto' to finish, the return value is always >= 'upto'.
1741  *
1742  * Note: When you are about to write out WAL, you must call this function
1743  * *before* acquiring WALWriteLock, to avoid deadlocks. This function might
1744  * need to wait for an insertion to finish (or at least advance to next
1745  * uninitialized page), and the inserter might need to evict an old WAL buffer
1746  * to make room for a new one, which in turn requires WALWriteLock.
1747  */
1748 static XLogRecPtr
1750 {
1751  uint64 bytepos;
1752  XLogRecPtr reservedUpto;
1753  XLogRecPtr finishedUpto;
1754  XLogCtlInsert *Insert = &XLogCtl->Insert;
1755  int i;
1756 
1757  if (MyProc == NULL)
1758  elog(PANIC, "cannot wait without a PGPROC structure");
1759 
1760  /* Read the current insert position */
1761  SpinLockAcquire(&Insert->insertpos_lck);
1762  bytepos = Insert->CurrBytePos;
1763  SpinLockRelease(&Insert->insertpos_lck);
1764  reservedUpto = XLogBytePosToEndRecPtr(bytepos);
1765 
1766  /*
1767  * No-one should request to flush a piece of WAL that hasn't even been
1768  * reserved yet. However, it can happen if there is a block with a bogus
1769  * LSN on disk, for example. XLogFlush checks for that situation and
1770  * complains, but only after the flush. Here we just assume that to mean
1771  * that all WAL that has been reserved needs to be finished. In this
1772  * corner-case, the return value can be smaller than 'upto' argument.
1773  */
1774  if (upto > reservedUpto)
1775  {
1776  elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X",
1777  (uint32) (upto >> 32), (uint32) upto,
1778  (uint32) (reservedUpto >> 32), (uint32) reservedUpto);
1779  upto = reservedUpto;
1780  }
1781 
1782  /*
1783  * Loop through all the locks, sleeping on any in-progress insert older
1784  * than 'upto'.
1785  *
1786  * finishedUpto is our return value, indicating the point upto which all
1787  * the WAL insertions have been finished. Initialize it to the head of
1788  * reserved WAL, and as we iterate through the insertion locks, back it
1789  * out for any insertion that's still in progress.
1790  */
1791  finishedUpto = reservedUpto;
1792  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
1793  {
1794  XLogRecPtr insertingat = InvalidXLogRecPtr;
1795 
1796  do
1797  {
1798  /*
1799  * See if this insertion is in progress. LWLockWaitForVar will
1800  * wait for the lock to be released, or for the 'value' to be set
1801  * by a LWLockUpdateVar call. When a lock is initially acquired,
1802  * its value is 0 (InvalidXLogRecPtr), which means that we don't
1803  * know where it's inserting yet. We will have to wait for it. If
1804  * it's a small insertion, the record will most likely fit on the
1805  * same page and the inserter will release the lock without ever
1806  * calling LWLockUpdateVar. But if it has to sleep, it will
1807  * advertise the insertion point with LWLockUpdateVar before
1808  * sleeping.
1809  */
1810  if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
1811  &WALInsertLocks[i].l.insertingAt,
1812  insertingat, &insertingat))
1813  {
1814  /* the lock was free, so no insertion in progress */
1815  insertingat = InvalidXLogRecPtr;
1816  break;
1817  }
1818 
1819  /*
1820  * This insertion is still in progress. Have to wait, unless the
1821  * inserter has proceeded past 'upto'.
1822  */
1823  } while (insertingat < upto);
1824 
1825  if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
1826  finishedUpto = insertingat;
1827  }
1828  return finishedUpto;
1829 }
1830 
1831 /*
1832  * Get a pointer to the right location in the WAL buffer containing the
1833  * given XLogRecPtr.
1834  *
1835  * If the page is not initialized yet, it is initialized. That might require
1836  * evicting an old dirty buffer from the buffer cache, which means I/O.
1837  *
1838  * The caller must ensure that the page containing the requested location
1839  * isn't evicted yet, and won't be evicted. The way to ensure that is to
1840  * hold onto a WAL insertion lock with the insertingAt position set to
1841  * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
1842  * to evict an old page from the buffer. (This means that once you call
1843  * GetXLogBuffer() with a given 'ptr', you must not access anything before
1844  * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
1845  * later, because older buffers might be recycled already)
1846  */
1847 static char *
1849 {
1850  int idx;
1851  XLogRecPtr endptr;
1852  static uint64 cachedPage = 0;
1853  static char *cachedPos = NULL;
1854  XLogRecPtr expectedEndPtr;
1855 
1856  /*
1857  * Fast path for the common case that we need to access again the same
1858  * page as last time.
1859  */
1860  if (ptr / XLOG_BLCKSZ == cachedPage)
1861  {
1862  Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
1863  Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
1864  return cachedPos + ptr % XLOG_BLCKSZ;
1865  }
1866 
1867  /*
1868  * The XLog buffer cache is organized so that a page is always loaded to a
1869  * particular buffer. That way we can easily calculate the buffer a given
1870  * page must be loaded into, from the XLogRecPtr alone.
1871  */
1872  idx = XLogRecPtrToBufIdx(ptr);
1873 
1874  /*
1875  * See what page is loaded in the buffer at the moment. It could be the
1876  * page we're looking for, or something older. It can't be anything newer
1877  * - that would imply the page we're looking for has already been written
1878  * out to disk and evicted, and the caller is responsible for making sure
1879  * that doesn't happen.
1880  *
1881  * However, we don't hold a lock while we read the value. If someone has
1882  * just initialized the page, it's possible that we get a "torn read" of
1883  * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In
1884  * that case we will see a bogus value. That's ok, we'll grab the mapping
1885  * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than
1886  * the page we're looking for. But it means that when we do this unlocked
1887  * read, we might see a value that appears to be ahead of the page we're
1888  * looking for. Don't PANIC on that, until we've verified the value while
1889  * holding the lock.
1890  */
1891  expectedEndPtr = ptr;
1892  expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ;
1893 
1894  endptr = XLogCtl->xlblocks[idx];
1895  if (expectedEndPtr != endptr)
1896  {
1897  XLogRecPtr initializedUpto;
1898 
1899  /*
1900  * Before calling AdvanceXLInsertBuffer(), which can block, let others
1901  * know how far we're finished with inserting the record.
1902  *
1903  * NB: If 'ptr' points to just after the page header, advertise a
1904  * position at the beginning of the page rather than 'ptr' itself. If
1905  * there are no other insertions running, someone might try to flush
1906  * up to our advertised location. If we advertised a position after
1907  * the page header, someone might try to flush the page header, even
1908  * though page might actually not be initialized yet. As the first
1909  * inserter on the page, we are effectively responsible for making
1910  * sure that it's initialized, before we let insertingAt to move past
1911  * the page header.
1912  */
1913  if (ptr % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
1914  XLogSegmentOffset(ptr, wal_segment_size) > XLOG_BLCKSZ)
1915  initializedUpto = ptr - SizeOfXLogShortPHD;
1916  else if (ptr % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
1917  XLogSegmentOffset(ptr, wal_segment_size) < XLOG_BLCKSZ)
1918  initializedUpto = ptr - SizeOfXLogLongPHD;
1919  else
1920  initializedUpto = ptr;
1921 
1922  WALInsertLockUpdateInsertingAt(initializedUpto);
1923 
1924  AdvanceXLInsertBuffer(ptr, false);
1925  endptr = XLogCtl->xlblocks[idx];
1926 
1927  if (expectedEndPtr != endptr)
1928  elog(PANIC, "could not find WAL buffer for %X/%X",
1929  (uint32) (ptr >> 32), (uint32) ptr);
1930  }
1931  else
1932  {
1933  /*
1934  * Make sure the initialization of the page is visible to us, and
1935  * won't arrive later to overwrite the WAL data we write on the page.
1936  */
1938  }
1939 
1940  /*
1941  * Found the buffer holding this page. Return a pointer to the right
1942  * offset within the page.
1943  */
1944  cachedPage = ptr / XLOG_BLCKSZ;
1945  cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
1946 
1947  Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC);
1948  Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ));
1949 
1950  return cachedPos + ptr % XLOG_BLCKSZ;
1951 }
1952 
1953 /*
1954  * Converts a "usable byte position" to XLogRecPtr. A usable byte position
1955  * is the position starting from the beginning of WAL, excluding all WAL
1956  * page headers.
1957  */
1958 static XLogRecPtr
1959 XLogBytePosToRecPtr(uint64 bytepos)
1960 {
1961  uint64 fullsegs;
1962  uint64 fullpages;
1963  uint64 bytesleft;
1964  uint32 seg_offset;
1965  XLogRecPtr result;
1966 
1967  fullsegs = bytepos / UsableBytesInSegment;
1968  bytesleft = bytepos % UsableBytesInSegment;
1969 
1970  if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
1971  {
1972  /* fits on first page of segment */
1973  seg_offset = bytesleft + SizeOfXLogLongPHD;
1974  }
1975  else
1976  {
1977  /* account for the first page on segment with long header */
1978  seg_offset = XLOG_BLCKSZ;
1979  bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
1980 
1981  fullpages = bytesleft / UsableBytesInPage;
1982  bytesleft = bytesleft % UsableBytesInPage;
1983 
1984  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
1985  }
1986 
1987  XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result);
1988 
1989  return result;
1990 }
1991 
1992 /*
1993  * Like XLogBytePosToRecPtr, but if the position is at a page boundary,
1994  * returns a pointer to the beginning of the page (ie. before page header),
1995  * not to where the first xlog record on that page would go to. This is used
1996  * when converting a pointer to the end of a record.
1997  */
1998 static XLogRecPtr
1999 XLogBytePosToEndRecPtr(uint64 bytepos)
2000 {
2001  uint64 fullsegs;
2002  uint64 fullpages;
2003  uint64 bytesleft;
2004  uint32 seg_offset;
2005  XLogRecPtr result;
2006 
2007  fullsegs = bytepos / UsableBytesInSegment;
2008  bytesleft = bytepos % UsableBytesInSegment;
2009 
2010  if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
2011  {
2012  /* fits on first page of segment */
2013  if (bytesleft == 0)
2014  seg_offset = 0;
2015  else
2016  seg_offset = bytesleft + SizeOfXLogLongPHD;
2017  }
2018  else
2019  {
2020  /* account for the first page on segment with long header */
2021  seg_offset = XLOG_BLCKSZ;
2022  bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
2023 
2024  fullpages = bytesleft / UsableBytesInPage;
2025  bytesleft = bytesleft % UsableBytesInPage;
2026 
2027  if (bytesleft == 0)
2028  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft;
2029  else
2030  seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
2031  }
2032 
2033  XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result);
2034 
2035  return result;
2036 }
2037 
2038 /*
2039  * Convert an XLogRecPtr to a "usable byte position".
2040  */
2041 static uint64
2043 {
2044  uint64 fullsegs;
2045  uint32 fullpages;
2046  uint32 offset;
2047  uint64 result;
2048 
2049  XLByteToSeg(ptr, fullsegs, wal_segment_size);
2050 
2051  fullpages = (XLogSegmentOffset(ptr, wal_segment_size)) / XLOG_BLCKSZ;
2052  offset = ptr % XLOG_BLCKSZ;
2053 
2054  if (fullpages == 0)
2055  {
2056  result = fullsegs * UsableBytesInSegment;
2057  if (offset > 0)
2058  {
2059  Assert(offset >= SizeOfXLogLongPHD);
2060  result += offset - SizeOfXLogLongPHD;
2061  }
2062  }
2063  else
2064  {
2065  result = fullsegs * UsableBytesInSegment +
2066  (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
2067  (fullpages - 1) * UsableBytesInPage; /* full pages */
2068  if (offset > 0)
2069  {
2070  Assert(offset >= SizeOfXLogShortPHD);
2071  result += offset - SizeOfXLogShortPHD;
2072  }
2073  }
2074 
2075  return result;
2076 }
2077 
2078 /*
2079  * Initialize XLOG buffers, writing out old buffers if they still contain
2080  * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
2081  * true, initialize as many pages as we can without having to write out
2082  * unwritten data. Any new pages are initialized to zeros, with pages headers
2083  * initialized properly.
2084  */
2085 static void
2086 AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
2087 {
2088  XLogCtlInsert *Insert = &XLogCtl->Insert;
2089  int nextidx;
2090  XLogRecPtr OldPageRqstPtr;
2091  XLogwrtRqst WriteRqst;
2092  XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
2093  XLogRecPtr NewPageBeginPtr;
2094  XLogPageHeader NewPage;
2095  int npages = 0;
2096 
2097  LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
2098 
2099  /*
2100  * Now that we have the lock, check if someone initialized the page
2101  * already.
2102  */
2103  while (upto >= XLogCtl->InitializedUpTo || opportunistic)
2104  {
2105  nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
2106 
2107  /*
2108  * Get ending-offset of the buffer page we need to replace (this may
2109  * be zero if the buffer hasn't been used yet). Fall through if it's
2110  * already written out.
2111  */
2112  OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
2113  if (LogwrtResult.Write < OldPageRqstPtr)
2114  {
2115  /*
2116  * Nope, got work to do. If we just want to pre-initialize as much
2117  * as we can without flushing, give up now.
2118  */
2119  if (opportunistic)
2120  break;
2121 
2122  /* Before waiting, get info_lck and update LogwrtResult */
2123  SpinLockAcquire(&XLogCtl->info_lck);
2124  if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
2125  XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
2126  LogwrtResult = XLogCtl->LogwrtResult;
2127  SpinLockRelease(&XLogCtl->info_lck);
2128 
2129  /*
2130  * Now that we have an up-to-date LogwrtResult value, see if we
2131  * still need to write it or if someone else already did.
2132  */
2133  if (LogwrtResult.Write < OldPageRqstPtr)
2134  {
2135  /*
2136  * Must acquire write lock. Release WALBufMappingLock first,
2137  * to make sure that all insertions that we need to wait for
2138  * can finish (up to this same position). Otherwise we risk
2139  * deadlock.
2140  */
2141  LWLockRelease(WALBufMappingLock);
2142 
2143  WaitXLogInsertionsToFinish(OldPageRqstPtr);
2144 
2145  LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
2146 
2147  LogwrtResult = XLogCtl->LogwrtResult;
2148  if (LogwrtResult.Write >= OldPageRqstPtr)
2149  {
2150  /* OK, someone wrote it already */
2151  LWLockRelease(WALWriteLock);
2152  }
2153  else
2154  {
2155  /* Have to write it ourselves */
2156  TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
2157  WriteRqst.Write = OldPageRqstPtr;
2158  WriteRqst.Flush = 0;
2159  XLogWrite(WriteRqst, false);
2160  LWLockRelease(WALWriteLock);
2161  TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
2162  }
2163  /* Re-acquire WALBufMappingLock and retry */
2164  LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
2165  continue;
2166  }
2167  }
2168 
2169  /*
2170  * Now the next buffer slot is free and we can set it up to be the
2171  * next output page.
2172  */
2173  NewPageBeginPtr = XLogCtl->InitializedUpTo;
2174  NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
2175 
2176  Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
2177 
2178  NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
2179 
2180  /*
2181  * Be sure to re-zero the buffer so that bytes beyond what we've
2182  * written will look like zeroes and not valid XLOG records...
2183  */
2184  MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
2185 
2186  /*
2187  * Fill the new page's header
2188  */
2189  NewPage->xlp_magic = XLOG_PAGE_MAGIC;
2190 
2191  /* NewPage->xlp_info = 0; */ /* done by memset */
2192  NewPage->xlp_tli = ThisTimeLineID;
2193  NewPage->xlp_pageaddr = NewPageBeginPtr;
2194 
2195  /* NewPage->xlp_rem_len = 0; */ /* done by memset */
2196 
2197  /*
2198  * If online backup is not in progress, mark the header to indicate
2199  * that WAL records beginning in this page have removable backup
2200  * blocks. This allows the WAL archiver to know whether it is safe to
2201  * compress archived WAL data by transforming full-block records into
2202  * the non-full-block format. It is sufficient to record this at the
2203  * page level because we force a page switch (in fact a segment
2204  * switch) when starting a backup, so the flag will be off before any
2205  * records can be written during the backup. At the end of a backup,
2206  * the last page will be marked as all unsafe when perhaps only part
2207  * is unsafe, but at worst the archiver would miss the opportunity to
2208  * compress a few records.
2209  */
2210  if (!Insert->forcePageWrites)
2211  NewPage->xlp_info |= XLP_BKP_REMOVABLE;
2212 
2213  /*
2214  * If first page of an XLOG segment file, make it a long header.
2215  */
2216  if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
2217  {
2218  XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
2219 
2220  NewLongPage->xlp_sysid = ControlFile->system_identifier;
2221  NewLongPage->xlp_seg_size = wal_segment_size;
2222  NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
2223  NewPage->xlp_info |= XLP_LONG_HEADER;
2224  }
2225 
2226  /*
2227  * Make sure the initialization of the page becomes visible to others
2228  * before the xlblocks update. GetXLogBuffer() reads xlblocks without
2229  * holding a lock.
2230  */
2231  pg_write_barrier();
2232 
2233  *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
2234 
2235  XLogCtl->InitializedUpTo = NewPageEndPtr;
2236 
2237  npages++;
2238  }
2239  LWLockRelease(WALBufMappingLock);
2240 
2241 #ifdef WAL_DEBUG
2242  if (XLOG_DEBUG && npages > 0)
2243  {
2244  elog(DEBUG1, "initialized %d pages, up to %X/%X",
2245  npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr);
2246  }
2247 #endif
2248 }
2249 
2250 /*
2251  * Calculate CheckPointSegments based on max_wal_size_mb and
2252  * checkpoint_completion_target.
2253  */
2254 static void
2256 {
2257  double target;
2258 
2259  /*-------
2260  * Calculate the distance at which to trigger a checkpoint, to avoid
2261  * exceeding max_wal_size_mb. This is based on two assumptions:
2262  *
2263  * a) we keep WAL for only one checkpoint cycle (prior to PG11 we kept
2264  * WAL for two checkpoint cycles to allow us to recover from the
2265  * secondary checkpoint if the first checkpoint failed, though we
2266  * only did this on the master anyway, not on standby. Keeping just
2267  * one checkpoint simplifies processing and reduces disk space in
2268  * many smaller databases.)
2269  * b) during checkpoint, we consume checkpoint_completion_target *
2270  * number of segments consumed between checkpoints.
2271  *-------
2272  */
2273  target = (double) ConvertToXSegs(max_wal_size_mb, wal_segment_size) /
2275 
2276  /* round down */
2277  CheckPointSegments = (int) target;
2278 
2279  if (CheckPointSegments < 1)
2280  CheckPointSegments = 1;
2281 }
2282 
2283 void
2284 assign_max_wal_size(int newval, void *extra)
2285 {
2288 }
2289 
2290 void
2292 {
2295 }
2296 
2297 /*
2298  * At a checkpoint, how many WAL segments to recycle as preallocated future
2299  * XLOG segments? Returns the highest segment that should be preallocated.
2300  */
2301 static XLogSegNo
2303 {
2304  XLogSegNo minSegNo;
2305  XLogSegNo maxSegNo;
2306  double distance;
2307  XLogSegNo recycleSegNo;
2308 
2309  /*
2310  * Calculate the segment numbers that min_wal_size_mb and max_wal_size_mb
2311  * correspond to. Always recycle enough segments to meet the minimum, and
2312  * remove enough segments to stay below the maximum.
2313  */
2314  minSegNo = RedoRecPtr / wal_segment_size +
2316  maxSegNo = RedoRecPtr / wal_segment_size +
2318 
2319  /*
2320  * Between those limits, recycle enough segments to get us through to the
2321  * estimated end of next checkpoint.
2322  *
2323  * To estimate where the next checkpoint will finish, assume that the
2324  * system runs steadily consuming CheckPointDistanceEstimate bytes between
2325  * every checkpoint.
2326  */
2328  /* add 10% for good measure. */
2329  distance *= 1.10;
2330 
2331  recycleSegNo = (XLogSegNo) ceil(((double) RedoRecPtr + distance) /
2333 
2334  if (recycleSegNo < minSegNo)
2335  recycleSegNo = minSegNo;
2336  if (recycleSegNo > maxSegNo)
2337  recycleSegNo = maxSegNo;
2338 
2339  return recycleSegNo;
2340 }
2341 
2342 /*
2343  * Check whether we've consumed enough xlog space that a checkpoint is needed.
2344  *
2345  * new_segno indicates a log file that has just been filled up (or read
2346  * during recovery). We measure the distance from RedoRecPtr to new_segno
2347  * and see if that exceeds CheckPointSegments.
2348  *
2349  * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
2350  */
2351 static bool
2353 {
2354  XLogSegNo old_segno;
2355 
2357 
2358  if (new_segno >= old_segno + (uint64) (CheckPointSegments - 1))
2359  return true;
2360  return false;
2361 }
2362 
2363 /*
2364  * Write and/or fsync the log at least as far as WriteRqst indicates.
2365  *
2366  * If flexible == true, we don't have to write as far as WriteRqst, but
2367  * may stop at any convenient boundary (such as a cache or logfile boundary).
2368  * This option allows us to avoid uselessly issuing multiple writes when a
2369  * single one would do.
2370  *
2371  * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
2372  * must be called before grabbing the lock, to make sure the data is ready to
2373  * write.
2374  */
2375 static void
2376 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
2377 {
2378  bool ispartialpage;
2379  bool last_iteration;
2380  bool finishing_seg;
2381  bool use_existent;
2382  int curridx;
2383  int npages;
2384  int startidx;
2385  uint32 startoffset;
2386 
2387  /* We should always be inside a critical section here */
2388  Assert(CritSectionCount > 0);
2389 
2390  /*
2391  * Update local LogwrtResult (caller probably did this already, but...)
2392  */
2393  LogwrtResult = XLogCtl->LogwrtResult;
2394 
2395  /*
2396  * Since successive pages in the xlog cache are consecutively allocated,
2397  * we can usually gather multiple pages together and issue just one
2398  * write() call. npages is the number of pages we have determined can be
2399  * written together; startidx is the cache block index of the first one,
2400  * and startoffset is the file offset at which it should go. The latter
2401  * two variables are only valid when npages > 0, but we must initialize
2402  * all of them to keep the compiler quiet.
2403  */
2404  npages = 0;
2405  startidx = 0;
2406  startoffset = 0;
2407 
2408  /*
2409  * Within the loop, curridx is the cache block index of the page to
2410  * consider writing. Begin at the buffer containing the next unwritten
2411  * page, or last partially written page.
2412  */
2413  curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
2414 
2415  while (LogwrtResult.Write < WriteRqst.Write)
2416  {
2417  /*
2418  * Make sure we're not ahead of the insert process. This could happen
2419  * if we're passed a bogus WriteRqst.Write that is past the end of the
2420  * last page that's been initialized by AdvanceXLInsertBuffer.
2421  */
2422  XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
2423 
2424  if (LogwrtResult.Write >= EndPtr)
2425  elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
2426  (uint32) (LogwrtResult.Write >> 32),
2427  (uint32) LogwrtResult.Write,
2428  (uint32) (EndPtr >> 32), (uint32) EndPtr);
2429 
2430  /* Advance LogwrtResult.Write to end of current buffer page */
2431  LogwrtResult.Write = EndPtr;
2432  ispartialpage = WriteRqst.Write < LogwrtResult.Write;
2433 
2434  if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
2436  {
2437  /*
2438  * Switch to new logfile segment. We cannot have any pending
2439  * pages here (since we dump what we have at segment end).
2440  */
2441  Assert(npages == 0);
2442  if (openLogFile >= 0)
2443  XLogFileClose();
2444  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2446 
2447  /* create/use new log file */
2448  use_existent = true;
2449  openLogFile = XLogFileInit(openLogSegNo, &use_existent, true);
2450  }
2451 
2452  /* Make sure we have the current logfile open */
2453  if (openLogFile < 0)
2454  {
2455  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2458  }
2459 
2460  /* Add current page to the set of pending pages-to-dump */
2461  if (npages == 0)
2462  {
2463  /* first of group */
2464  startidx = curridx;
2465  startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
2467  }
2468  npages++;
2469 
2470  /*
2471  * Dump the set if this will be the last loop iteration, or if we are
2472  * at the last page of the cache area (since the next page won't be
2473  * contiguous in memory), or if we are at the end of the logfile
2474  * segment.
2475  */
2476  last_iteration = WriteRqst.Write <= LogwrtResult.Write;
2477 
2478  finishing_seg = !ispartialpage &&
2479  (startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
2480 
2481  if (last_iteration ||
2482  curridx == XLogCtl->XLogCacheBlck ||
2483  finishing_seg)
2484  {
2485  char *from;
2486  Size nbytes;
2487  Size nleft;
2488  int written;
2489 
2490  /* OK to write the page(s) */
2491  from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
2492  nbytes = npages * (Size) XLOG_BLCKSZ;
2493  nleft = nbytes;
2494  do
2495  {
2496  errno = 0;
2498  written = pg_pwrite(openLogFile, from, nleft, startoffset);
2500  if (written <= 0)
2501  {
2502  if (errno == EINTR)
2503  continue;
2504  ereport(PANIC,
2506  errmsg("could not write to log file %s "
2507  "at offset %u, length %zu: %m",
2509  startoffset, nleft)));
2510  }
2511  nleft -= written;
2512  from += written;
2513  startoffset += written;
2514  } while (nleft > 0);
2515 
2516  npages = 0;
2517 
2518  /*
2519  * If we just wrote the whole last page of a logfile segment,
2520  * fsync the segment immediately. This avoids having to go back
2521  * and re-open prior segments when an fsync request comes along
2522  * later. Doing it here ensures that one and only one backend will
2523  * perform this fsync.
2524  *
2525  * This is also the right place to notify the Archiver that the
2526  * segment is ready to copy to archival storage, and to update the
2527  * timer for archive_timeout, and to signal for a checkpoint if
2528  * too many logfile segments have been used since the last
2529  * checkpoint.
2530  */
2531  if (finishing_seg)
2532  {
2534 
2535  /* signal that we need to wakeup walsenders later */
2537 
2538  LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
2539 
2540  if (XLogArchivingActive())
2542 
2543  XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
2544  XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
2545 
2546  /*
2547  * Request a checkpoint if we've consumed too much xlog since
2548  * the last one. For speed, we first check using the local
2549  * copy of RedoRecPtr, which might be out of date; if it looks
2550  * like a checkpoint is needed, forcibly update RedoRecPtr and
2551  * recheck.
2552  */
2554  {
2555  (void) GetRedoRecPtr();
2558  }
2559  }
2560  }
2561 
2562  if (ispartialpage)
2563  {
2564  /* Only asked to write a partial page */
2565  LogwrtResult.Write = WriteRqst.Write;
2566  break;
2567  }
2568  curridx = NextBufIdx(curridx);
2569 
2570  /* If flexible, break out of loop as soon as we wrote something */
2571  if (flexible && npages == 0)
2572  break;
2573  }
2574 
2575  Assert(npages == 0);
2576 
2577  /*
2578  * If asked to flush, do so
2579  */
2580  if (LogwrtResult.Flush < WriteRqst.Flush &&
2581  LogwrtResult.Flush < LogwrtResult.Write)
2582 
2583  {
2584  /*
2585  * Could get here without iterating above loop, in which case we might
2586  * have no open file or the wrong one. However, we do not need to
2587  * fsync more than one file.
2588  */
2589  if (sync_method != SYNC_METHOD_OPEN &&
2591  {
2592  if (openLogFile >= 0 &&
2593  !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
2595  XLogFileClose();
2596  if (openLogFile < 0)
2597  {
2598  XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
2601  }
2602 
2604  }
2605 
2606  /* signal that we need to wakeup walsenders later */
2608 
2609  LogwrtResult.Flush = LogwrtResult.Write;
2610  }
2611 
2612  /*
2613  * Update shared-memory status
2614  *
2615  * We make sure that the shared 'request' values do not fall behind the
2616  * 'result' values. This is not absolutely essential, but it saves some
2617  * code in a couple of places.
2618  */
2619  {
2620  SpinLockAcquire(&XLogCtl->info_lck);
2621  XLogCtl->LogwrtResult = LogwrtResult;
2622  if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
2623  XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
2624  if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
2625  XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
2626  SpinLockRelease(&XLogCtl->info_lck);
2627  }
2628 }
2629 
2630 /*
2631  * Record the LSN for an asynchronous transaction commit/abort
2632  * and nudge the WALWriter if there is work for it to do.
2633  * (This should not be called for synchronous commits.)
2634  */
2635 void
2637 {
2638  XLogRecPtr WriteRqstPtr = asyncXactLSN;
2639  bool sleeping;
2640 
2641  SpinLockAcquire(&XLogCtl->info_lck);
2642  LogwrtResult = XLogCtl->LogwrtResult;
2643  sleeping = XLogCtl->WalWriterSleeping;
2644  if (XLogCtl->asyncXactLSN < asyncXactLSN)
2645  XLogCtl->asyncXactLSN = asyncXactLSN;
2646  SpinLockRelease(&XLogCtl->info_lck);
2647 
2648  /*
2649  * If the WALWriter is sleeping, we should kick it to make it come out of
2650  * low-power mode. Otherwise, determine whether there's a full page of
2651  * WAL available to write.
2652  */
2653  if (!sleeping)
2654  {
2655  /* back off to last completed page boundary */
2656  WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
2657 
2658  /* if we have already flushed that far, we're done */
2659  if (WriteRqstPtr <= LogwrtResult.Flush)
2660  return;
2661  }
2662 
2663  /*
2664  * Nudge the WALWriter: it has a full page of WAL to write, or we want it
2665  * to come out of low-power mode so that this async commit will reach disk
2666  * within the expected amount of time.
2667  */
2670 }
2671 
2672 /*
2673  * Record the LSN up to which we can remove WAL because it's not required by
2674  * any replication slot.
2675  */
2676 void
2678 {
2679  SpinLockAcquire(&XLogCtl->info_lck);
2680  XLogCtl->replicationSlotMinLSN = lsn;
2681  SpinLockRelease(&XLogCtl->info_lck);
2682 }
2683 
2684 
2685 /*
2686  * Return the oldest LSN we must retain to satisfy the needs of some
2687  * replication slot.
2688  */
2689 static XLogRecPtr
2691 {
2692  XLogRecPtr retval;
2693 
2694  SpinLockAcquire(&XLogCtl->info_lck);
2695  retval = XLogCtl->replicationSlotMinLSN;
2696  SpinLockRelease(&XLogCtl->info_lck);
2697 
2698  return retval;
2699 }
2700 
2701 /*
2702  * Advance minRecoveryPoint in control file.
2703  *
2704  * If we crash during recovery, we must reach this point again before the
2705  * database is consistent.
2706  *
2707  * If 'force' is true, 'lsn' argument is ignored. Otherwise, minRecoveryPoint
2708  * is only updated if it's not already greater than or equal to 'lsn'.
2709  */
2710 static void
2712 {
2713  /* Quick check using our local copy of the variable */
2714  if (!updateMinRecoveryPoint || (!force && lsn <= minRecoveryPoint))
2715  return;
2716 
2717  /*
2718  * An invalid minRecoveryPoint means that we need to recover all the WAL,
2719  * i.e., we're doing crash recovery. We never modify the control file's
2720  * value in that case, so we can short-circuit future checks here too. The
2721  * local values of minRecoveryPoint and minRecoveryPointTLI should not be
2722  * updated until crash recovery finishes. We only do this for the startup
2723  * process as it should not update its own reference of minRecoveryPoint
2724  * until it has finished crash recovery to make sure that all WAL
2725  * available is replayed in this case. This also saves from extra locks
2726  * taken on the control file from the startup process.
2727  */
2729  {
2730  updateMinRecoveryPoint = false;
2731  return;
2732  }
2733 
2734  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2735 
2736  /* update local copy */
2737  minRecoveryPoint = ControlFile->minRecoveryPoint;
2739 
2741  updateMinRecoveryPoint = false;
2742  else if (force || minRecoveryPoint < lsn)
2743  {
2744  XLogRecPtr newMinRecoveryPoint;
2745  TimeLineID newMinRecoveryPointTLI;
2746 
2747  /*
2748  * To avoid having to update the control file too often, we update it
2749  * all the way to the last record being replayed, even though 'lsn'
2750  * would suffice for correctness. This also allows the 'force' case
2751  * to not need a valid 'lsn' value.
2752  *
2753  * Another important reason for doing it this way is that the passed
2754  * 'lsn' value could be bogus, i.e., past the end of available WAL, if
2755  * the caller got it from a corrupted heap page. Accepting such a
2756  * value as the min recovery point would prevent us from coming up at
2757  * all. Instead, we just log a warning and continue with recovery.
2758  * (See also the comments about corrupt LSNs in XLogFlush.)
2759  */
2760  SpinLockAcquire(&XLogCtl->info_lck);
2761  newMinRecoveryPoint = XLogCtl->replayEndRecPtr;
2762  newMinRecoveryPointTLI = XLogCtl->replayEndTLI;
2763  SpinLockRelease(&XLogCtl->info_lck);
2764 
2765  if (!force && newMinRecoveryPoint < lsn)
2766  elog(WARNING,
2767  "xlog min recovery request %X/%X is past current point %X/%X",
2768  (uint32) (lsn >> 32), (uint32) lsn,
2769  (uint32) (newMinRecoveryPoint >> 32),
2770  (uint32) newMinRecoveryPoint);
2771 
2772  /* update control file */
2773  if (ControlFile->minRecoveryPoint < newMinRecoveryPoint)
2774  {
2775  ControlFile->minRecoveryPoint = newMinRecoveryPoint;
2776  ControlFile->minRecoveryPointTLI = newMinRecoveryPointTLI;
2778  minRecoveryPoint = newMinRecoveryPoint;
2779  minRecoveryPointTLI = newMinRecoveryPointTLI;
2780 
2781  ereport(DEBUG2,
2782  (errmsg("updated min recovery point to %X/%X on timeline %u",
2783  (uint32) (minRecoveryPoint >> 32),
2785  newMinRecoveryPointTLI)));
2786  }
2787  }
2788  LWLockRelease(ControlFileLock);
2789 }
2790 
2791 /*
2792  * Ensure that all XLOG data through the given position is flushed to disk.
2793  *
2794  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
2795  * already held, and we try to avoid acquiring it if possible.
2796  */
2797 void
2799 {
2800  XLogRecPtr WriteRqstPtr;
2801  XLogwrtRqst WriteRqst;
2802 
2803  /*
2804  * During REDO, we are reading not writing WAL. Therefore, instead of
2805  * trying to flush the WAL, we should update minRecoveryPoint instead. We
2806  * test XLogInsertAllowed(), not InRecovery, because we need checkpointer
2807  * to act this way too, and because when it tries to write the
2808  * end-of-recovery checkpoint, it should indeed flush.
2809  */
2810  if (!XLogInsertAllowed())
2811  {
2812  UpdateMinRecoveryPoint(record, false);
2813  return;
2814  }
2815 
2816  /* Quick exit if already known flushed */
2817  if (record <= LogwrtResult.Flush)
2818  return;
2819 
2820 #ifdef WAL_DEBUG
2821  if (XLOG_DEBUG)
2822  elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
2823  (uint32) (record >> 32), (uint32) record,
2824  (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
2825  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
2826 #endif
2827 
2829 
2830  /*
2831  * Since fsync is usually a horribly expensive operation, we try to
2832  * piggyback as much data as we can on each fsync: if we see any more data
2833  * entered into the xlog buffer, we'll write and fsync that too, so that
2834  * the final value of LogwrtResult.Flush is as large as possible. This
2835  * gives us some chance of avoiding another fsync immediately after.
2836  */
2837 
2838  /* initialize to given target; may increase below */
2839  WriteRqstPtr = record;
2840 
2841  /*
2842  * Now wait until we get the write lock, or someone else does the flush
2843  * for us.
2844  */
2845  for (;;)
2846  {
2847  XLogRecPtr insertpos;
2848 
2849  /* read LogwrtResult and update local state */
2850  SpinLockAcquire(&XLogCtl->info_lck);
2851  if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
2852  WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
2853  LogwrtResult = XLogCtl->LogwrtResult;
2854  SpinLockRelease(&XLogCtl->info_lck);
2855 
2856  /* done already? */
2857  if (record <= LogwrtResult.Flush)
2858  break;
2859 
2860  /*
2861  * Before actually performing the write, wait for all in-flight
2862  * insertions to the pages we're about to write to finish.
2863  */
2864  insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
2865 
2866  /*
2867  * Try to get the write lock. If we can't get it immediately, wait
2868  * until it's released, and recheck if we still need to do the flush
2869  * or if the backend that held the lock did it for us already. This
2870  * helps to maintain a good rate of group committing when the system
2871  * is bottlenecked by the speed of fsyncing.
2872  */
2873  if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
2874  {
2875  /*
2876  * The lock is now free, but we didn't acquire it yet. Before we
2877  * do, loop back to check if someone else flushed the record for
2878  * us already.
2879  */
2880  continue;
2881  }
2882 
2883  /* Got the lock; recheck whether request is satisfied */
2884  LogwrtResult = XLogCtl->LogwrtResult;
2885  if (record <= LogwrtResult.Flush)
2886  {
2887  LWLockRelease(WALWriteLock);
2888  break;
2889  }
2890 
2891  /*
2892  * Sleep before flush! By adding a delay here, we may give further
2893  * backends the opportunity to join the backlog of group commit
2894  * followers; this can significantly improve transaction throughput,
2895  * at the risk of increasing transaction latency.
2896  *
2897  * We do not sleep if enableFsync is not turned on, nor if there are
2898  * fewer than CommitSiblings other backends with active transactions.
2899  */
2900  if (CommitDelay > 0 && enableFsync &&
2902  {
2904 
2905  /*
2906  * Re-check how far we can now flush the WAL. It's generally not
2907  * safe to call WaitXLogInsertionsToFinish while holding
2908  * WALWriteLock, because an in-progress insertion might need to
2909  * also grab WALWriteLock to make progress. But we know that all
2910  * the insertions up to insertpos have already finished, because
2911  * that's what the earlier WaitXLogInsertionsToFinish() returned.
2912  * We're only calling it again to allow insertpos to be moved
2913  * further forward, not to actually wait for anyone.
2914  */
2915  insertpos = WaitXLogInsertionsToFinish(insertpos);
2916  }
2917 
2918  /* try to write/flush later additions to XLOG as well */
2919  WriteRqst.Write = insertpos;
2920  WriteRqst.Flush = insertpos;
2921 
2922  XLogWrite(WriteRqst, false);
2923 
2924  LWLockRelease(WALWriteLock);
2925  /* done */
2926  break;
2927  }
2928 
2929  END_CRIT_SECTION();
2930 
2931  /* wake up walsenders now that we've released heavily contended locks */
2933 
2934  /*
2935  * If we still haven't flushed to the request point then we have a
2936  * problem; most likely, the requested flush point is past end of XLOG.
2937  * This has been seen to occur when a disk page has a corrupted LSN.
2938  *
2939  * Formerly we treated this as a PANIC condition, but that hurts the
2940  * system's robustness rather than helping it: we do not want to take down
2941  * the whole system due to corruption on one data page. In particular, if
2942  * the bad page is encountered again during recovery then we would be
2943  * unable to restart the database at all! (This scenario actually
2944  * happened in the field several times with 7.1 releases.) As of 8.4, bad
2945  * LSNs encountered during recovery are UpdateMinRecoveryPoint's problem;
2946  * the only time we can reach here during recovery is while flushing the
2947  * end-of-recovery checkpoint record, and we don't expect that to have a
2948  * bad LSN.
2949  *
2950  * Note that for calls from xact.c, the ERROR will be promoted to PANIC
2951  * since xact.c calls this routine inside a critical section. However,
2952  * calls from bufmgr.c are not within critical sections and so we will not
2953  * force a restart for a bad LSN on a data page.
2954  */
2955  if (LogwrtResult.Flush < record)
2956  elog(ERROR,
2957  "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
2958  (uint32) (record >> 32), (uint32) record,
2959  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
2960 }
2961 
2962 /*
2963  * Write & flush xlog, but without specifying exactly where to.
2964  *
2965  * We normally write only completed blocks; but if there is nothing to do on
2966  * that basis, we check for unwritten async commits in the current incomplete
2967  * block, and write through the latest one of those. Thus, if async commits
2968  * are not being used, we will write complete blocks only.
2969  *
2970  * If, based on the above, there's anything to write we do so immediately. But
2971  * to avoid calling fsync, fdatasync et. al. at a rate that'd impact
2972  * concurrent IO, we only flush WAL every wal_writer_delay ms, or if there's
2973  * more than wal_writer_flush_after unflushed blocks.
2974  *
2975  * We can guarantee that async commits reach disk after at most three
2976  * wal_writer_delay cycles. (When flushing complete blocks, we allow XLogWrite
2977  * to write "flexibly", meaning it can stop at the end of the buffer ring;
2978  * this makes a difference only with very high load or long wal_writer_delay,
2979  * but imposes one extra cycle for the worst case for async commits.)
2980  *
2981  * This routine is invoked periodically by the background walwriter process.
2982  *
2983  * Returns true if there was any work to do, even if we skipped flushing due
2984  * to wal_writer_delay/wal_writer_flush_after.
2985  */
2986 bool
2988 {
2989  XLogwrtRqst WriteRqst;
2990  bool flexible = true;
2991  static TimestampTz lastflush;
2992  TimestampTz now;
2993  int flushbytes;
2994 
2995  /* XLOG doesn't need flushing during recovery */
2996  if (RecoveryInProgress())
2997  return false;
2998 
2999  /* read LogwrtResult and update local state */
3000  SpinLockAcquire(&XLogCtl->info_lck);
3001  LogwrtResult = XLogCtl->LogwrtResult;
3002  WriteRqst = XLogCtl->LogwrtRqst;
3003  SpinLockRelease(&XLogCtl->info_lck);
3004 
3005  /* back off to last completed page boundary */
3006  WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
3007 
3008  /* if we have already flushed that far, consider async commit records */
3009  if (WriteRqst.Write <= LogwrtResult.Flush)
3010  {
3011  SpinLockAcquire(&XLogCtl->info_lck);
3012  WriteRqst.Write = XLogCtl->asyncXactLSN;
3013  SpinLockRelease(&XLogCtl->info_lck);
3014  flexible = false; /* ensure it all gets written */
3015  }
3016 
3017  /*
3018  * If already known flushed, we're done. Just need to check if we are
3019  * holding an open file handle to a logfile that's no longer in use,
3020  * preventing the file from being deleted.
3021  */
3022  if (WriteRqst.Write <= LogwrtResult.Flush)
3023  {
3024  if (openLogFile >= 0)
3025  {
3026  if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
3028  {
3029  XLogFileClose();
3030  }
3031  }
3032  return false;
3033  }
3034 
3035  /*
3036  * Determine how far to flush WAL, based on the wal_writer_delay and
3037  * wal_writer_flush_after GUCs.
3038  */
3039  now = GetCurrentTimestamp();
3040  flushbytes =
3041  WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
3042 
3043  if (WalWriterFlushAfter == 0 || lastflush == 0)
3044  {
3045  /* first call, or block based limits disabled */
3046  WriteRqst.Flush = WriteRqst.Write;
3047  lastflush = now;
3048  }
3049  else if (TimestampDifferenceExceeds(lastflush, now, WalWriterDelay))
3050  {
3051  /*
3052  * Flush the writes at least every WalWriterDelay ms. This is
3053  * important to bound the amount of time it takes for an asynchronous
3054  * commit to hit disk.
3055  */
3056  WriteRqst.Flush = WriteRqst.Write;
3057  lastflush = now;
3058  }
3059  else if (flushbytes >= WalWriterFlushAfter)
3060  {
3061  /* exceeded wal_writer_flush_after blocks, flush */
3062  WriteRqst.Flush = WriteRqst.Write;
3063  lastflush = now;
3064  }
3065  else
3066  {
3067  /* no flushing, this time round */
3068  WriteRqst.Flush = 0;
3069  }
3070 
3071 #ifdef WAL_DEBUG
3072  if (XLOG_DEBUG)
3073  elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X",
3074  (uint32) (WriteRqst.Write >> 32), (uint32) WriteRqst.Write,
3075  (uint32) (WriteRqst.Flush >> 32), (uint32) WriteRqst.Flush,
3076  (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write,
3077  (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush);
3078 #endif
3079 
3081 
3082  /* now wait for any in-progress insertions to finish and get write lock */
3083  WaitXLogInsertionsToFinish(WriteRqst.Write);
3084  LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
3085  LogwrtResult = XLogCtl->LogwrtResult;
3086  if (WriteRqst.Write > LogwrtResult.Write ||
3087  WriteRqst.Flush > LogwrtResult.Flush)
3088  {
3089  XLogWrite(WriteRqst, flexible);
3090  }
3091  LWLockRelease(WALWriteLock);
3092 
3093  END_CRIT_SECTION();
3094 
3095  /* wake up walsenders now that we've released heavily contended locks */
3097 
3098  /*
3099  * Great, done. To take some work off the critical path, try to initialize
3100  * as many of the no-longer-needed WAL buffers for future use as we can.
3101  */
3103 
3104  /*
3105  * If we determined that we need to write data, but somebody else
3106  * wrote/flushed already, it should be considered as being active, to
3107  * avoid hibernating too early.
3108  */
3109  return true;
3110 }
3111 
3112 /*
3113  * Test whether XLOG data has been flushed up to (at least) the given position.
3114  *
3115  * Returns true if a flush is still needed. (It may be that someone else
3116  * is already in process of flushing that far, however.)
3117  */
3118 bool
3120 {
3121  /*
3122  * During recovery, we don't flush WAL but update minRecoveryPoint
3123  * instead. So "needs flush" is taken to mean whether minRecoveryPoint
3124  * would need to be updated.
3125  */
3126  if (RecoveryInProgress())
3127  {
3128  /*
3129  * An invalid minRecoveryPoint means that we need to recover all the
3130  * WAL, i.e., we're doing crash recovery. We never modify the control
3131  * file's value in that case, so we can short-circuit future checks
3132  * here too. This triggers a quick exit path for the startup process,
3133  * which cannot update its local copy of minRecoveryPoint as long as
3134  * it has not replayed all WAL available when doing crash recovery.
3135  */
3137  updateMinRecoveryPoint = false;
3138 
3139  /* Quick exit if already known to be updated or cannot be updated */
3140  if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
3141  return false;
3142 
3143  /*
3144  * Update local copy of minRecoveryPoint. But if the lock is busy,
3145  * just return a conservative guess.
3146  */
3147  if (!LWLockConditionalAcquire(ControlFileLock, LW_SHARED))
3148  return true;
3149  minRecoveryPoint = ControlFile->minRecoveryPoint;
3151  LWLockRelease(ControlFileLock);
3152 
3153  /*
3154  * Check minRecoveryPoint for any other process than the startup
3155  * process doing crash recovery, which should not update the control
3156  * file value if crash recovery is still running.
3157  */
3159  updateMinRecoveryPoint = false;
3160 
3161  /* check again */
3162  if (record <= minRecoveryPoint || !updateMinRecoveryPoint)
3163  return false;
3164  else
3165  return true;
3166  }
3167 
3168  /* Quick exit if already known flushed */
3169  if (record <= LogwrtResult.Flush)
3170  return false;
3171 
3172  /* read LogwrtResult and update local state */
3173  SpinLockAcquire(&XLogCtl->info_lck);
3174  LogwrtResult = XLogCtl->LogwrtResult;
3175  SpinLockRelease(&XLogCtl->info_lck);
3176 
3177  /* check again */
3178  if (record <= LogwrtResult.Flush)
3179  return false;
3180 
3181  return true;
3182 }
3183 
3184 /*
3185  * Create a new XLOG file segment, or open a pre-existing one.
3186  *
3187  * log, seg: identify segment to be created/opened.
3188  *
3189  * *use_existent: if true, OK to use a pre-existing file (else, any
3190  * pre-existing file will be deleted). On return, true if a pre-existing
3191  * file was used.
3192  *
3193  * use_lock: if true, acquire ControlFileLock while moving file into
3194  * place. This should be true except during bootstrap log creation. The
3195  * caller must *not* hold the lock at call.
3196  *
3197  * Returns FD of opened file.
3198  *
3199  * Note: errors here are ERROR not PANIC because we might or might not be
3200  * inside a critical section (eg, during checkpoint there is no reason to
3201  * take down the system on failure). They will promote to PANIC if we are
3202  * in a critical section.
3203  */
3204 int
3205 XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
3206 {
3207  char path[MAXPGPATH];
3208  char tmppath[MAXPGPATH];
3209  PGAlignedXLogBlock zbuffer;
3210  XLogSegNo installed_segno;
3211  XLogSegNo max_segno;
3212  int fd;
3213  int nbytes;
3214  int save_errno;
3215 
3216  XLogFilePath(path, ThisTimeLineID, logsegno, wal_segment_size);
3217 
3218  /*
3219  * Try to use existent file (checkpoint maker may have created it already)
3220  */
3221  if (*use_existent)
3222  {
3223  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3224  if (fd < 0)
3225  {
3226  if (errno != ENOENT)
3227  ereport(ERROR,
3229  errmsg("could not open file \"%s\": %m", path)));
3230  }
3231  else
3232  return fd;
3233  }
3234 
3235  /*
3236  * Initialize an empty (all zeroes) segment. NOTE: it is possible that
3237  * another process is doing the same thing. If so, we will end up
3238  * pre-creating an extra log segment. That seems OK, and better than
3239  * holding the lock throughout this lengthy process.
3240  */
3241  elog(DEBUG2, "creating and filling new WAL file");
3242 
3243  snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3244 
3245  unlink(tmppath);
3246 
3247  /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3248  fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
3249  if (fd < 0)
3250  ereport(ERROR,
3252  errmsg("could not create file \"%s\": %m", tmppath)));
3253 
3254  memset(zbuffer.data, 0, XLOG_BLCKSZ);
3255 
3257  save_errno = 0;
3258  if (wal_init_zero)
3259  {
3260  /*
3261  * Zero-fill the file. With this setting, we do this the hard way to
3262  * ensure that all the file space has really been allocated. On
3263  * platforms that allow "holes" in files, just seeking to the end
3264  * doesn't allocate intermediate space. This way, we know that we
3265  * have all the space and (after the fsync below) that all the
3266  * indirect blocks are down on disk. Therefore, fdatasync(2) or
3267  * O_DSYNC will be sufficient to sync future writes to the log file.
3268  */
3269  for (nbytes = 0; nbytes < wal_segment_size; nbytes += XLOG_BLCKSZ)
3270  {
3271  errno = 0;
3272  if (write(fd, zbuffer.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3273  {
3274  /* if write didn't set errno, assume no disk space */
3275  save_errno = errno ? errno : ENOSPC;
3276  break;
3277  }
3278  }
3279  }
3280  else
3281  {
3282  /*
3283  * Otherwise, seeking to the end and writing a solitary byte is
3284  * enough.
3285  */
3286  errno = 0;
3287  if (pg_pwrite(fd, zbuffer.data, 1, wal_segment_size - 1) != 1)
3288  {
3289  /* if write didn't set errno, assume no disk space */
3290  save_errno = errno ? errno : ENOSPC;
3291  }
3292  }
3294 
3295  if (save_errno)
3296  {
3297  /*
3298  * If we fail to make the file, delete it to release disk space
3299  */
3300  unlink(tmppath);
3301 
3302  close(fd);
3303 
3304  errno = save_errno;
3305 
3306  ereport(ERROR,
3308  errmsg("could not write to file \"%s\": %m", tmppath)));
3309  }
3310 
3312  if (pg_fsync(fd) != 0)
3313  {
3314  int save_errno = errno;
3315 
3316  close(fd);
3317  errno = save_errno;
3318  ereport(ERROR,
3320  errmsg("could not fsync file \"%s\": %m", tmppath)));
3321  }
3323 
3324  if (close(fd) != 0)
3325  ereport(ERROR,
3327  errmsg("could not close file \"%s\": %m", tmppath)));
3328 
3329  /*
3330  * Now move the segment into place with its final name.
3331  *
3332  * If caller didn't want to use a pre-existing file, get rid of any
3333  * pre-existing file. Otherwise, cope with possibility that someone else
3334  * has created the file while we were filling ours: if so, use ours to
3335  * pre-create a future log segment.
3336  */
3337  installed_segno = logsegno;
3338 
3339  /*
3340  * XXX: What should we use as max_segno? We used to use XLOGfileslop when
3341  * that was a constant, but that was always a bit dubious: normally, at a
3342  * checkpoint, XLOGfileslop was the offset from the checkpoint record, but
3343  * here, it was the offset from the insert location. We can't do the
3344  * normal XLOGfileslop calculation here because we don't have access to
3345  * the prior checkpoint's redo location. So somewhat arbitrarily, just use
3346  * CheckPointSegments.
3347  */
3348  max_segno = logsegno + CheckPointSegments;
3349  if (!InstallXLogFileSegment(&installed_segno, tmppath,
3350  *use_existent, max_segno,
3351  use_lock))
3352  {
3353  /*
3354  * No need for any more future segments, or InstallXLogFileSegment()
3355  * failed to rename the file into place. If the rename failed, opening
3356  * the file below will fail.
3357  */
3358  unlink(tmppath);
3359  }
3360 
3361  /* Set flag to tell caller there was no existent file */
3362  *use_existent = false;
3363 
3364  /* Now open original target segment (might not be file I just made) */
3365  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3366  if (fd < 0)
3367  ereport(ERROR,
3369  errmsg("could not open file \"%s\": %m", path)));
3370 
3371  elog(DEBUG2, "done creating and filling new WAL file");
3372 
3373  return fd;
3374 }
3375 
3376 /*
3377  * Create a new XLOG file segment by copying a pre-existing one.
3378  *
3379  * destsegno: identify segment to be created.
3380  *
3381  * srcTLI, srcsegno: identify segment to be copied (could be from
3382  * a different timeline)
3383  *
3384  * upto: how much of the source file to copy (the rest is filled with
3385  * zeros)
3386  *
3387  * Currently this is only used during recovery, and so there are no locking
3388  * considerations. But we should be just as tense as XLogFileInit to avoid
3389  * emplacing a bogus file.
3390  */
3391 static void
3392 XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno,
3393  int upto)
3394 {
3395  char path[MAXPGPATH];
3396  char tmppath[MAXPGPATH];
3397  PGAlignedXLogBlock buffer;
3398  int srcfd;
3399  int fd;
3400  int nbytes;
3401 
3402  /*
3403  * Open the source file
3404  */
3405  XLogFilePath(path, srcTLI, srcsegno, wal_segment_size);
3406  srcfd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3407  if (srcfd < 0)
3408  ereport(ERROR,
3410  errmsg("could not open file \"%s\": %m", path)));
3411 
3412  /*
3413  * Copy into a temp file name.
3414  */
3415  snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3416 
3417  unlink(tmppath);
3418 
3419  /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3420  fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
3421  if (fd < 0)
3422  ereport(ERROR,
3424  errmsg("could not create file \"%s\": %m", tmppath)));
3425 
3426  /*
3427  * Do the data copying.
3428  */
3429  for (nbytes = 0; nbytes < wal_segment_size; nbytes += sizeof(buffer))
3430  {
3431  int nread;
3432 
3433  nread = upto - nbytes;
3434 
3435  /*
3436  * The part that is not read from the source file is filled with
3437  * zeros.
3438  */
3439  if (nread < sizeof(buffer))
3440  memset(buffer.data, 0, sizeof(buffer));
3441 
3442  if (nread > 0)
3443  {
3444  int r;
3445 
3446  if (nread > sizeof(buffer))
3447  nread = sizeof(buffer);
3449  r = read(srcfd, buffer.data, nread);
3450  if (r != nread)
3451  {
3452  if (r < 0)
3453  ereport(ERROR,
3455  errmsg("could not read file \"%s\": %m",
3456  path)));
3457  else
3458  ereport(ERROR,
3460  errmsg("could not read file \"%s\": read %d of %zu",
3461  path, r, (Size) nread)));
3462  }
3464  }
3465  errno = 0;
3467  if ((int) write(fd, buffer.data, sizeof(buffer)) != (int) sizeof(buffer))
3468  {
3469  int save_errno = errno;
3470 
3471  /*
3472  * If we fail to make the file, delete it to release disk space
3473  */
3474  unlink(tmppath);
3475  /* if write didn't set errno, assume problem is no disk space */
3476  errno = save_errno ? save_errno : ENOSPC;
3477 
3478  ereport(ERROR,
3480  errmsg("could not write to file \"%s\": %m", tmppath)));
3481  }
3483  }
3484 
3486  if (pg_fsync(fd) != 0)
3489  errmsg("could not fsync file \"%s\": %m", tmppath)));
3491 
3492  if (CloseTransientFile(fd) != 0)
3493  ereport(ERROR,
3495  errmsg("could not close file \"%s\": %m", tmppath)));
3496 
3497  if (CloseTransientFile(srcfd) != 0)
3498  ereport(ERROR,
3500  errmsg("could not close file \"%s\": %m", path)));
3501 
3502  /*
3503  * Now move the segment into place with its final name.
3504  */
3505  if (!InstallXLogFileSegment(&destsegno, tmppath, false, 0, false))
3506  elog(ERROR, "InstallXLogFileSegment should not have failed");
3507 }
3508 
3509 /*
3510  * Install a new XLOG segment file as a current or future log segment.
3511  *
3512  * This is used both to install a newly-created segment (which has a temp
3513  * filename while it's being created) and to recycle an old segment.
3514  *
3515  * *segno: identify segment to install as (or first possible target).
3516  * When find_free is true, this is modified on return to indicate the
3517  * actual installation location or last segment searched.
3518  *
3519  * tmppath: initial name of file to install. It will be renamed into place.
3520  *
3521  * find_free: if true, install the new segment at the first empty segno
3522  * number at or after the passed numbers. If false, install the new segment
3523  * exactly where specified, deleting any existing segment file there.
3524  *
3525  * max_segno: maximum segment number to install the new file as. Fail if no
3526  * free slot is found between *segno and max_segno. (Ignored when find_free
3527  * is false.)
3528  *
3529  * use_lock: if true, acquire ControlFileLock while moving file into
3530  * place. This should be true except during bootstrap log creation. The
3531  * caller must *not* hold the lock at call.
3532  *
3533  * Returns true if the file was installed successfully. false indicates that
3534  * max_segno limit was exceeded, or an error occurred while renaming the
3535  * file into place.
3536  */
3537 static bool
3538 InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
3539  bool find_free, XLogSegNo max_segno,
3540  bool use_lock)
3541 {
3542  char path[MAXPGPATH];
3543  struct stat stat_buf;
3544 
3546 
3547  /*
3548  * We want to be sure that only one process does this at a time.
3549  */
3550  if (use_lock)
3551  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
3552 
3553  if (!find_free)
3554  {
3555  /* Force installation: get rid of any pre-existing segment file */
3556  durable_unlink(path, DEBUG1);
3557  }
3558  else
3559  {
3560  /* Find a free slot to put it in */
3561  while (stat(path, &stat_buf) == 0)
3562  {
3563  if ((*segno) >= max_segno)
3564  {
3565  /* Failed to find a free slot within specified range */
3566  if (use_lock)
3567  LWLockRelease(ControlFileLock);
3568  return false;
3569  }
3570  (*segno)++;
3572  }
3573  }
3574 
3575  /*
3576  * Perform the rename using link if available, paranoidly trying to avoid
3577  * overwriting an existing file (there shouldn't be one).
3578  */
3579  if (durable_link_or_rename(tmppath, path, LOG) != 0)
3580  {
3581  if (use_lock)
3582  LWLockRelease(ControlFileLock);
3583  /* durable_link_or_rename already emitted log message */
3584  return false;
3585  }
3586 
3587  if (use_lock)
3588  LWLockRelease(ControlFileLock);
3589 
3590  return true;
3591 }
3592 
3593 /*
3594  * Open a pre-existing logfile segment for writing.
3595  */
3596 int
3598 {
3599  char path[MAXPGPATH];
3600  int fd;
3601 
3603 
3604  fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
3605  if (fd < 0)
3606  ereport(PANIC,
3608  errmsg("could not open file \"%s\": %m", path)));
3609 
3610  return fd;
3611 }
3612 
3613 /*
3614  * Open a logfile segment for reading (during recovery).
3615  *
3616  * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
3617  * Otherwise, it's assumed to be already available in pg_wal.
3618  */
3619 static int
3620 XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
3621  int source, bool notfoundOk)
3622 {
3623  char xlogfname[MAXFNAMELEN];
3624  char activitymsg[MAXFNAMELEN + 16];
3625  char path[MAXPGPATH];
3626  int fd;
3627 
3628  XLogFileName(xlogfname, tli, segno, wal_segment_size);
3629 
3630  switch (source)
3631  {
3632  case XLOG_FROM_ARCHIVE:
3633  /* Report recovery progress in PS display */
3634  snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
3635  xlogfname);
3636  set_ps_display(activitymsg, false);
3637 
3638  restoredFromArchive = RestoreArchivedFile(path, xlogfname,
3639  "RECOVERYXLOG",
3641  InRedo);
3642  if (!restoredFromArchive)
3643  return -1;
3644  break;
3645 
3646  case XLOG_FROM_PG_WAL:
3647  case XLOG_FROM_STREAM:
3648  XLogFilePath(path, tli, segno, wal_segment_size);
3649  restoredFromArchive = false;
3650  break;
3651 
3652  default:
3653  elog(ERROR, "invalid XLogFileRead source %d", source);
3654  }
3655 
3656  /*
3657  * If the segment was fetched from archival storage, replace the existing
3658  * xlog segment (if any) with the archival version.
3659  */
3660  if (source == XLOG_FROM_ARCHIVE)
3661  {
3662  KeepFileRestoredFromArchive(path, xlogfname);
3663 
3664  /*
3665  * Set path to point at the new file in pg_wal.
3666  */
3667  snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
3668  }
3669 
3670  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3671  if (fd >= 0)
3672  {
3673  /* Success! */
3674  curFileTLI = tli;
3675 
3676  /* Report recovery progress in PS display */
3677  snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
3678  xlogfname);
3679  set_ps_display(activitymsg, false);
3680 
3681  /* Track source of data in assorted state variables */
3682  readSource = source;
3683  XLogReceiptSource = source;
3684  /* In FROM_STREAM case, caller tracks receipt time, not me */
3685  if (source != XLOG_FROM_STREAM)
3687 
3688  return fd;
3689  }
3690  if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
3691  ereport(PANIC,
3693  errmsg("could not open file \"%s\": %m", path)));
3694  return -1;
3695 }
3696 
3697 /*
3698  * Open a logfile segment for reading (during recovery).
3699  *
3700  * This version searches for the segment with any TLI listed in expectedTLEs.
3701  */
3702 static int
3703 XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
3704 {
3705  char path[MAXPGPATH];
3706  ListCell *cell;
3707  int fd;
3708  List *tles;
3709 
3710  /*
3711  * Loop looking for a suitable timeline ID: we might need to read any of
3712  * the timelines listed in expectedTLEs.
3713  *
3714  * We expect curFileTLI on entry to be the TLI of the preceding file in
3715  * sequence, or 0 if there was no predecessor. We do not allow curFileTLI
3716  * to go backwards; this prevents us from picking up the wrong file when a
3717  * parent timeline extends to higher segment numbers than the child we
3718  * want to read.
3719  *
3720  * If we haven't read the timeline history file yet, read it now, so that
3721  * we know which TLIs to scan. We don't save the list in expectedTLEs,
3722  * however, unless we actually find a valid segment. That way if there is
3723  * neither a timeline history file nor a WAL segment in the archive, and
3724  * streaming replication is set up, we'll read the timeline history file
3725  * streamed from the master when we start streaming, instead of recovering
3726  * with a dummy history generated here.
3727  */
3728  if (expectedTLEs)
3729  tles = expectedTLEs;
3730  else
3732 
3733  foreach(cell, tles)
3734  {
3735  TimeLineID tli = ((TimeLineHistoryEntry *) lfirst(cell))->tli;
3736 
3737  if (tli < curFileTLI)
3738  break; /* don't bother looking at too-old TLIs */
3739 
3740  if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
3741  {
3742  fd = XLogFileRead(segno, emode, tli,
3743  XLOG_FROM_ARCHIVE, true);
3744  if (fd != -1)
3745  {
3746  elog(DEBUG1, "got WAL segment from archive");
3747  if (!expectedTLEs)
3748  expectedTLEs = tles;
3749  return fd;
3750  }
3751  }
3752 
3753  if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_WAL)
3754  {
3755  fd = XLogFileRead(segno, emode, tli,
3756  XLOG_FROM_PG_WAL, true);
3757  if (fd != -1)
3758  {
3759  if (!expectedTLEs)
3760  expectedTLEs = tles;
3761  return fd;
3762  }
3763  }
3764  }
3765 
3766  /* Couldn't find it. For simplicity, complain about front timeline */
3768  errno = ENOENT;
3769  ereport(emode,
3771  errmsg("could not open file \"%s\": %m", path)));
3772  return -1;
3773 }
3774 
3775 /*
3776  * Close the current logfile segment for writing.
3777  */
3778 static void
3780 {
3781  Assert(openLogFile >= 0);
3782 
3783  /*
3784  * WAL segment files will not be re-read in normal operation, so we advise
3785  * the OS to release any cached pages. But do not do so if WAL archiving
3786  * or streaming is active, because archiver and walsender process could
3787  * use the cache to read the WAL segment.
3788  */
3789 #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
3790  if (!XLogIsNeeded())
3791  (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
3792 #endif
3793 
3794  if (close(openLogFile) != 0)
3795  ereport(PANIC,
3797  errmsg("could not close file \"%s\": %m",
3799  openLogFile = -1;
3800 }
3801 
3802 /*
3803  * Preallocate log files beyond the specified log endpoint.
3804  *
3805  * XXX this is currently extremely conservative, since it forces only one
3806  * future log segment to exist, and even that only if we are 75% done with
3807  * the current one. This is only appropriate for very low-WAL-volume systems.
3808  * High-volume systems will be OK once they've built up a sufficient set of
3809  * recycled log segments, but the startup transient is likely to include
3810  * a lot of segment creations by foreground processes, which is not so good.
3811  */
3812 static void
3814 {
3815  XLogSegNo _logSegNo;
3816  int lf;
3817  bool use_existent;
3818  uint64 offset;
3819 
3820  XLByteToPrevSeg(endptr, _logSegNo, wal_segment_size);
3821  offset = XLogSegmentOffset(endptr - 1, wal_segment_size);
3822  if (offset >= (uint32) (0.75 * wal_segment_size))
3823  {
3824  _logSegNo++;
3825  use_existent = true;
3826  lf = XLogFileInit(_logSegNo, &use_existent, true);
3827  close(lf);
3828  if (!use_existent)
3829  CheckpointStats.ckpt_segs_added++;
3830  }
3831 }
3832 
3833 /*
3834  * Throws an error if the given log segment has already been removed or
3835  * recycled. The caller should only pass a segment that it knows to have
3836  * existed while the server has been running, as this function always
3837  * succeeds if no WAL segments have been removed since startup.
3838  * 'tli' is only used in the error message.
3839  *
3840  * Note: this function guarantees to keep errno unchanged on return.
3841  * This supports callers that use this to possibly deliver a better
3842  * error message about a missing file, while still being able to throw
3843  * a normal file-access error afterwards, if this does return.
3844  */
3845 void
3847 {
3848  int save_errno = errno;
3849  XLogSegNo lastRemovedSegNo;
3850 
3851  SpinLockAcquire(&XLogCtl->info_lck);
3852  lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
3853  SpinLockRelease(&XLogCtl->info_lck);
3854 
3855  if (segno <= lastRemovedSegNo)
3856  {
3857  char filename[MAXFNAMELEN];
3858 
3859  XLogFileName(filename, tli, segno, wal_segment_size);
3860  errno = save_errno;
3861  ereport(ERROR,
3863  errmsg("requested WAL segment %s has already been removed",
3864  filename)));
3865  }
3866  errno = save_errno;
3867 }
3868 
3869 /*
3870  * Return the last WAL segment removed, or 0 if no segment has been removed
3871  * since startup.
3872  *
3873  * NB: the result can be out of date arbitrarily fast, the caller has to deal
3874  * with that.
3875  */
3876 XLogSegNo
3878 {
3879  XLogSegNo lastRemovedSegNo;
3880 
3881  SpinLockAcquire(&XLogCtl->info_lck);
3882  lastRemovedSegNo = XLogCtl->lastRemovedSegNo;
3883  SpinLockRelease(&XLogCtl->info_lck);
3884 
3885  return lastRemovedSegNo;
3886 }
3887 
3888 /*
3889  * Update the last removed segno pointer in shared memory, to reflect
3890  * that the given XLOG file has been removed.
3891  */
3892 static void
3894 {
3895  uint32 tli;
3896  XLogSegNo segno;
3897 
3898  XLogFromFileName(filename, &tli, &segno, wal_segment_size);
3899 
3900  SpinLockAcquire(&XLogCtl->info_lck);
3901  if (segno > XLogCtl->lastRemovedSegNo)
3902  XLogCtl->lastRemovedSegNo = segno;
3903  SpinLockRelease(&XLogCtl->info_lck);
3904 }
3905 
3906 /*
3907  * Remove all temporary log files in pg_wal
3908  *
3909  * This is called at the beginning of recovery after a previous crash,
3910  * at a point where no other processes write fresh WAL data.
3911  */
3912 static void
3914 {
3915  DIR *xldir;
3916  struct dirent *xlde;
3917 
3918  elog(DEBUG2, "removing all temporary WAL segments");
3919 
3920  xldir = AllocateDir(XLOGDIR);
3921  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3922  {
3923  char path[MAXPGPATH];
3924 
3925  if (strncmp(xlde->d_name, "xlogtemp.", 9) != 0)
3926  continue;
3927 
3928  snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
3929  unlink(path);
3930  elog(DEBUG2, "removed temporary WAL segment \"%s\"", path);
3931  }
3932  FreeDir(xldir);
3933 }
3934 
3935 /*
3936  * Recycle or remove all log files older or equal to passed segno.
3937  *
3938  * endptr is current (or recent) end of xlog, and RedoRecPtr is the
3939  * redo pointer of the last checkpoint. These are used to determine
3940  * whether we want to recycle rather than delete no-longer-wanted log files.
3941  */
3942 static void
3944 {
3945  DIR *xldir;
3946  struct dirent *xlde;
3947  char lastoff[MAXFNAMELEN];
3948 
3949  /*
3950  * Construct a filename of the last segment to be kept. The timeline ID
3951  * doesn't matter, we ignore that in the comparison. (During recovery,
3952  * ThisTimeLineID isn't set, so we can't use that.)
3953  */
3954  XLogFileName(lastoff, 0, segno, wal_segment_size);
3955 
3956  elog(DEBUG2, "attempting to remove WAL segments older than log file %s",
3957  lastoff);
3958 
3959  xldir = AllocateDir(XLOGDIR);
3960 
3961  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
3962  {
3963  /* Ignore files that are not XLOG segments */
3964  if (!IsXLogFileName(xlde->d_name) &&
3965  !IsPartialXLogFileName(xlde->d_name))
3966  continue;
3967 
3968  /*
3969  * We ignore the timeline part of the XLOG segment identifiers in
3970  * deciding whether a segment is still needed. This ensures that we
3971  * won't prematurely remove a segment from a parent timeline. We could
3972  * probably be a little more proactive about removing segments of
3973  * non-parent timelines, but that would be a whole lot more
3974  * complicated.
3975  *
3976  * We use the alphanumeric sorting property of the filenames to decide
3977  * which ones are earlier than the lastoff segment.
3978  */
3979  if (strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
3980  {
3981  if (XLogArchiveCheckDone(xlde->d_name))
3982  {
3983  /* Update the last removed location in shared memory first */
3985 
3986  RemoveXlogFile(xlde->d_name, RedoRecPtr, endptr);
3987  }
3988  }
3989  }
3990 
3991  FreeDir(xldir);
3992 }
3993 
3994 /*
3995  * Remove WAL files that are not part of the given timeline's history.
3996  *
3997  * This is called during recovery, whenever we switch to follow a new
3998  * timeline, and at the end of recovery when we create a new timeline. We
3999  * wouldn't otherwise care about extra WAL files lying in pg_wal, but they
4000  * might be leftover pre-allocated or recycled WAL segments on the old timeline
4001  * that we haven't used yet, and contain garbage. If we just leave them in
4002  * pg_wal, they will eventually be archived, and we can't let that happen.
4003  * Files that belong to our timeline history are valid, because we have
4004  * successfully replayed them, but from others we can't be sure.
4005  *
4006  * 'switchpoint' is the current point in WAL where we switch to new timeline,
4007  * and 'newTLI' is the new timeline we switch to.
4008  */
4009 static void
4011 {
4012  DIR *xldir;
4013  struct dirent *xlde;
4014  char switchseg[MAXFNAMELEN];
4015  XLogSegNo endLogSegNo;
4016 
4017  XLByteToPrevSeg(switchpoint, endLogSegNo, wal_segment_size);
4018 
4019  /*
4020  * Construct a filename of the last segment to be kept.
4021  */
4022  XLogFileName(switchseg, newTLI, endLogSegNo, wal_segment_size);
4023 
4024  elog(DEBUG2, "attempting to remove WAL segments newer than log file %s",
4025  switchseg);
4026 
4027  xldir = AllocateDir(XLOGDIR);
4028 
4029  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
4030  {
4031  /* Ignore files that are not XLOG segments */
4032  if (!IsXLogFileName(xlde->d_name))
4033  continue;
4034 
4035  /*
4036  * Remove files that are on a timeline older than the new one we're
4037  * switching to, but with a segment number >= the first segment on the
4038  * new timeline.
4039  */
4040  if (strncmp(xlde->d_name, switchseg, 8) < 0 &&
4041  strcmp(xlde->d_name + 8, switchseg + 8) > 0)
4042  {
4043  /*
4044  * If the file has already been marked as .ready, however, don't
4045  * remove it yet. It should be OK to remove it - files that are
4046  * not part of our timeline history are not required for recovery
4047  * - but seems safer to let them be archived and removed later.
4048  */
4049  if (!XLogArchiveIsReady(xlde->d_name))
4050  RemoveXlogFile(xlde->d_name, InvalidXLogRecPtr, switchpoint);
4051  }
4052  }
4053 
4054  FreeDir(xldir);
4055 }
4056 
4057 /*
4058  * Recycle or remove a log file that's no longer needed.
4059  *
4060  * endptr is current (or recent) end of xlog, and RedoRecPtr is the
4061  * redo pointer of the last checkpoint. These are used to determine
4062  * whether we want to recycle rather than delete no-longer-wanted log files.
4063  * If RedoRecPtr is not known, pass invalid, and the function will recycle,
4064  * somewhat arbitrarily, 10 future segments.
4065  */
4066 static void
4067 RemoveXlogFile(const char *segname, XLogRecPtr RedoRecPtr, XLogRecPtr endptr)
4068 {
4069  char path[MAXPGPATH];
4070 #ifdef WIN32
4071  char newpath[MAXPGPATH];
4072 #endif
4073  struct stat statbuf;
4074  XLogSegNo endlogSegNo;
4075  XLogSegNo recycleSegNo;
4076 
4077  if (wal_recycle)
4078  {
4079  /*
4080  * Initialize info about where to try to recycle to.
4081  */
4082  XLByteToSeg(endptr, endlogSegNo, wal_segment_size);
4083  if (RedoRecPtr == InvalidXLogRecPtr)
4084  recycleSegNo = endlogSegNo + 10;
4085  else
4086  recycleSegNo = XLOGfileslop(RedoRecPtr);
4087  }
4088  else
4089  recycleSegNo = 0; /* keep compiler quiet */
4090 
4091  snprintf(path, MAXPGPATH, XLOGDIR "/%s", segname);
4092 
4093  /*
4094  * Before deleting the file, see if it can be recycled as a future log
4095  * segment. Only recycle normal files, pg_standby for example can create
4096  * symbolic links pointing to a separate archive directory.
4097  */
4098  if (wal_recycle &&
4099  endlogSegNo <= recycleSegNo &&
4100  lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
4101  InstallXLogFileSegment(&endlogSegNo, path,
4102  true, recycleSegNo, true))
4103  {
4104  ereport(DEBUG2,
4105  (errmsg("recycled write-ahead log file \"%s\"",
4106  segname)));
4107  CheckpointStats.ckpt_segs_recycled++;
4108  /* Needn't recheck that slot on future iterations */
4109  endlogSegNo++;
4110  }
4111  else
4112  {
4113  /* No need for any more future segments... */
4114  int rc;
4115 
4116  ereport(DEBUG2,
4117  (errmsg("removing write-ahead log file \"%s\"",
4118  segname)));
4119 
4120 #ifdef WIN32
4121 
4122  /*
4123  * On Windows, if another process (e.g another backend) holds the file
4124  * open in FILE_SHARE_DELETE mode, unlink will succeed, but the file
4125  * will still show up in directory listing until the last handle is
4126  * closed. To avoid confusing the lingering deleted file for a live
4127  * WAL file that needs to be archived, rename it before deleting it.
4128  *
4129  * If another process holds the file open without FILE_SHARE_DELETE
4130  * flag, rename will fail. We'll try again at the next checkpoint.
4131  */
4132  snprintf(newpath, MAXPGPATH, "%s.deleted", path);
4133  if (rename(path, newpath) != 0)
4134  {
4135  ereport(LOG,
4137  errmsg("could not rename file \"%s\": %m",
4138  path)));
4139  return;
4140  }
4141  rc = durable_unlink(newpath, LOG);
4142 #else
4143  rc = durable_unlink(path, LOG);
4144 #endif
4145  if (rc != 0)
4146  {
4147  /* Message already logged by durable_unlink() */
4148  return;
4149  }
4150  CheckpointStats.ckpt_segs_removed++;
4151  }
4152 
4153  XLogArchiveCleanup(segname);
4154 }
4155 
4156 /*
4157  * Verify whether pg_wal and pg_wal/archive_status exist.
4158  * If the latter does not exist, recreate it.
4159  *
4160  * It is not the goal of this function to verify the contents of these
4161  * directories, but to help in cases where someone has performed a cluster
4162  * copy for PITR purposes but omitted pg_wal from the copy.
4163  *
4164  * We could also recreate pg_wal if it doesn't exist, but a deliberate
4165  * policy decision was made not to. It is fairly common for pg_wal to be
4166  * a symlink, and if that was the DBA's intent then automatically making a
4167  * plain directory would result in degraded performance with no notice.
4168  */
4169 static void
4171 {
4172  char path[MAXPGPATH];
4173  struct stat stat_buf;
4174 
4175  /* Check for pg_wal; if it doesn't exist, error out */
4176  if (stat(XLOGDIR, &stat_buf) != 0 ||
4177  !S_ISDIR(stat_buf.st_mode))
4178  ereport(FATAL,
4179  (errmsg("required WAL directory \"%s\" does not exist",
4180  XLOGDIR)));
4181 
4182  /* Check for archive_status */
4183  snprintf(path, MAXPGPATH, XLOGDIR "/archive_status");
4184  if (stat(path, &stat_buf) == 0)
4185  {
4186  /* Check for weird cases where it exists but isn't a directory */
4187  if (!S_ISDIR(stat_buf.st_mode))
4188  ereport(FATAL,
4189  (errmsg("required WAL directory \"%s\" does not exist",
4190  path)));
4191  }
4192  else
4193  {
4194  ereport(LOG,
4195  (errmsg("creating missing WAL directory \"%s\"", path)));
4196  if (MakePGDirectory(path) < 0)
4197  ereport(FATAL,
4198  (errmsg("could not create missing directory \"%s\": %m",
4199  path)));
4200  }
4201 }
4202 
4203 /*
4204  * Remove previous backup history files. This also retries creation of
4205  * .ready files for any backup history files for which XLogArchiveNotify
4206  * failed earlier.
4207  */
4208 static void
4210 {
4211  DIR *xldir;
4212  struct dirent *xlde;
4213  char path[MAXPGPATH + sizeof(XLOGDIR)];
4214 
4215  xldir = AllocateDir(XLOGDIR);
4216 
4217  while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
4218  {
4219  if (IsBackupHistoryFileName(xlde->d_name))
4220  {
4221  if (XLogArchiveCheckDone(xlde->d_name))
4222  {
4223  elog(DEBUG2, "removing WAL backup history file \"%s\"",
4224  xlde->d_name);
4225  snprintf(path, sizeof(path), XLOGDIR "/%s", xlde->d_name);
4226  unlink(path);
4227  XLogArchiveCleanup(xlde->d_name);
4228  }
4229  }
4230  }
4231 
4232  FreeDir(xldir);
4233 }
4234 
4235 /*
4236  * Attempt to read an XLOG record.
4237  *
4238  * If RecPtr is valid, try to read a record at that position. Otherwise
4239  * try to read a record just after the last one previously read.
4240  *
4241  * If no valid record is available, returns NULL, or fails if emode is PANIC.
4242  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
4243  * record is available.
4244  */
4245 static XLogRecord *
4246 ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
4247  bool fetching_ckpt)
4248 {
4249  XLogRecord *record;
4250  XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
4251 
4252  /* Pass through parameters to XLogPageRead */
4253  private->fetching_ckpt = fetching_ckpt;
4254  private->emode = emode;
4255  private->randAccess = (RecPtr != InvalidXLogRecPtr);
4256 
4257  /* This is the first attempt to read this page. */
4258  lastSourceFailed = false;
4259 
4260  for (;;)
4261  {
4262  char *errormsg;
4263 
4264  record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
4265  ReadRecPtr = xlogreader->ReadRecPtr;
4266  EndRecPtr = xlogreader->EndRecPtr;
4267  if (record == NULL)
4268  {
4269  if (readFile >= 0)
4270  {
4271  close(readFile);
4272  readFile = -1;
4273  }
4274 
4275  /*
4276  * We only end up here without a message when XLogPageRead()
4277  * failed - in that case we already logged something. In
4278  * StandbyMode that only happens if we have been triggered, so we
4279  * shouldn't loop anymore in that case.
4280  */
4281  if (errormsg)
4283  RecPtr ? RecPtr : EndRecPtr),
4284  (errmsg_internal("%s", errormsg) /* already translated */ ));
4285  }
4286 
4287  /*
4288  * Check page TLI is one of the expected values.
4289  */
4290  else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs))
4291  {
4292  char fname[MAXFNAMELEN];
4293  XLogSegNo segno;
4294  int32 offset;
4295 
4296  XLByteToSeg(xlogreader->latestPagePtr, segno, wal_segment_size);
4297  offset = XLogSegmentOffset(xlogreader->latestPagePtr,
4299  XLogFileName(fname, xlogreader->seg.ws_tli, segno,
4302  RecPtr ? RecPtr : EndRecPtr),
4303  (errmsg("unexpected timeline ID %u in log segment %s, offset %u",
4304  xlogreader->latestPageTLI,
4305  fname,
4306  offset)));
4307  record = NULL;
4308  }
4309 
4310  if (record)
4311  {
4312  /* Great, got a record */
4313  return record;
4314  }
4315  else
4316  {
4317  /* No valid record available from this source */
4318  lastSourceFailed = true;
4319 
4320  /*
4321  * If archive recovery was requested, but we were still doing
4322  * crash recovery, switch to archive recovery and retry using the
4323  * offline archive. We have now replayed all the valid WAL in
4324  * pg_wal, so we are presumably now consistent.
4325  *
4326  * We require that there's at least some valid WAL present in
4327  * pg_wal, however (!fetching_ckpt). We could recover using the
4328  * WAL from the archive, even if pg_wal is completely empty, but
4329  * we'd have no idea how far we'd have to replay to reach
4330  * consistency. So err on the safe side and give up.
4331  */
4333  !fetching_ckpt)
4334  {
4335  ereport(DEBUG1,
4336  (errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
4337  InArchiveRecovery = true;
4339  StandbyMode = true;
4340 
4341  /* initialize minRecoveryPoint to this record */
4342  LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
4343  ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
4344  if (ControlFile->minRecoveryPoint < EndRecPtr)
4345  {
4346  ControlFile->minRecoveryPoint = EndRecPtr;
4347  ControlFile->minRecoveryPointTLI = ThisTimeLineID;
4348  }
4349  /* update local copy */
4350  minRecoveryPoint = ControlFile->minRecoveryPoint;
4352 
4353  /*
4354  * The startup process can update its local copy of
4355  * minRecoveryPoint from this point.
4356  */
4357  updateMinRecoveryPoint = true;
4358 
4360  LWLockRelease(ControlFileLock);
4361 
4363 
4364  /*
4365  * Before we retry, reset lastSourceFailed and currentSource
4366  * so that we will check the archive next.
4367  */
4368  lastSourceFailed = false;
4369  currentSource = 0;
4370 
4371  continue;
4372  }
4373 
4374  /* In standby mode, loop back to retry. Otherwise, give up. */
4376  continue;
4377  else
4378  return NULL;
4379  }
4380  }
4381 }
4382 
4383 /*
4384  * Scan for new timelines that might have appeared in the archive since we
4385  * started recovery.
4386  *
4387  * If there are any, the function changes recovery target TLI to the latest
4388  * one and returns 'true'.
4389  */
4390 static bool
4392 {
4393  List *newExpectedTLEs;
4394  bool found;
4395  ListCell *cell;
4396  TimeLineID newtarget;
4397  TimeLineID oldtarget = recoveryTargetTLI;
4398  TimeLineHistoryEntry *currentTle = NULL;
4399 
4401  if (newtarget == recoveryTargetTLI)
4402  {
4403  /* No new timelines found */
4404  return false;
4405  }
4406 
4407  /*
4408  * Determine the list of expected TLIs for the new TLI
4409  */
4410 
4411  newExpectedTLEs = readTimeLineHistory(newtarget);
4412 
4413  /*
4414  * If the current timeline is not part of the history of the new timeline,
4415  * we cannot proceed to it.
4416  */
4417  found = false;
4418  foreach(cell, newExpectedTLEs)
4419  {
4420  currentTle = (TimeLineHistoryEntry *) lfirst(cell);
4421 
4422  if (currentTle->tli == recoveryTargetTLI)
4423  {
4424  found = true;
4425  break;
4426  }
4427  }
4428  if (!found)
4429  {
4430  ereport(LOG,
4431  (errmsg("new timeline %u is not a child of database system timeline %u",
4432  newtarget,
4433  ThisTimeLineID)));
4434  return false;
4435  }
4436 
4437  /*
4438  * The current timeline was found in the history file, but check that the
4439  * next timeline was forked off from it *after* the current recovery
4440  * location.
4441  */
4442  if (currentTle->end < EndRecPtr)
4443  {
4444  ereport(LOG,
4445  (errmsg("new timeline %u forked off current database system timeline %u before current recovery point %X/%X",
4446  newtarget,
4448  (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr)));
4449  return false;
4450  }
4451 
4452  /* The new timeline history seems valid. Switch target */
4453  recoveryTargetTLI = newtarget;
4454  list_free_deep(expectedTLEs);
4455  expectedTLEs = newExpectedTLEs;
4456 
4457  /*
4458  * As in StartupXLOG(), try to ensure we have all the history files
4459  * between the old target and new target in pg_wal.
4460  */
4461  restoreTimeLineHistoryFiles(oldtarget + 1, newtarget);
4462 
4463  ereport(LOG,
4464  (errmsg("new target timeline is %u",
4465  recoveryTargetTLI)));
4466 
4467  return true;
4468 }
4469 
4470 /*
4471  * I/O routines for pg_control
4472  *
4473  * *ControlFile is a buffer in shared memory that holds an image of the
4474  * contents of pg_control. WriteControlFile() initializes pg_control
4475  * given a preloaded buffer, ReadControlFile() loads the buffer from
4476  * the pg_control file (during postmaster or standalone-backend startup),
4477  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
4478  *
4479  * For simplicity, WriteControlFile() initializes the fields of pg_control
4480  * that are related to checking backend/database compatibility, and
4481  * ReadControlFile() verifies they are correct. We could split out the
4482  * I/O and compatibility-check functions, but there seems no need currently.
4483  */
4484 static void
4486 {
4487  int fd;
4488  char buffer[PG_CONTROL_FILE_SIZE]; /* need not be aligned */
4489 
4490  /*
4491  * Ensure that the size of the pg_control data structure is sane. See the
4492  * comments for these symbols in pg_control.h.
4493  */
4495  "pg_control is too large for atomic disk writes");
4497  "sizeof(ControlFileData) exceeds PG_CONTROL_FILE_SIZE");
4498 
4499  /*
4500  * Initialize version and compatibility-check fields
4501  */
4502  ControlFile->pg_control_version = PG_CONTROL_VERSION;
4503  ControlFile->catalog_version_no = CATALOG_VERSION_NO;
4504 
4505  ControlFile->maxAlign = MAXIMUM_ALIGNOF;
4506  ControlFile->floatFormat = FLOATFORMAT_VALUE;
4507 
4508  ControlFile->blcksz = BLCKSZ;
4509  ControlFile->relseg_size = RELSEG_SIZE;
4510  ControlFile->xlog_blcksz = XLOG_BLCKSZ;
4511  ControlFile->xlog_seg_size = wal_segment_size;
4512 
4513  ControlFile->nameDataLen = NAMEDATALEN;
4514  ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
4515 
4517  ControlFile->loblksize = LOBLKSIZE;
4518 
4519  ControlFile->float4ByVal = FLOAT4PASSBYVAL;
4520  ControlFile->float8ByVal = FLOAT8PASSBYVAL;
4521 
4522  /* Contents are protected with a CRC */
4523  INIT_CRC32C(ControlFile->crc);
4524  COMP_CRC32C(ControlFile->crc,
4525  (char *) ControlFile,
4526  offsetof(ControlFileData, crc));
4527  FIN_CRC32C(ControlFile->crc);
4528 
4529  /*
4530  * We write out PG_CONTROL_FILE_SIZE bytes into pg_control, zero-padding
4531  * the excess over sizeof(ControlFileData). This reduces the odds of
4532  * premature-EOF errors when reading pg_control. We'll still fail when we
4533  * check the contents of the file, but hopefully with a more specific
4534  * error than "couldn't read pg_control".
4535  */
4536  memset(buffer, 0, PG_CONTROL_FILE_SIZE);
4537  memcpy(buffer, ControlFile, sizeof(ControlFileData));
4538 
4540  O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
4541  if (fd < 0)
4542  ereport(PANIC,
4544  errmsg("could not create file \"%s\": %m",
4545  XLOG_CONTROL_FILE)));
4546 
4547  errno = 0;
4549  if (write(fd, buffer, PG_CONTROL_FILE_SIZE) != PG_CONTROL_FILE_SIZE)
4550  {
4551  /* if write didn't set errno, assume problem is no disk space */
4552  if (errno == 0)
4553  errno = ENOSPC;
4554  ereport(PANIC,
4556  errmsg("could not write to file \"%s\": %m",
4557  XLOG_CONTROL_FILE)));
4558  }
4560 
4562  if (pg_fsync(fd) != 0)
4563  ereport(PANIC,
4565  errmsg("could not fsync file \"%s\": %m",
4566  XLOG_CONTROL_FILE)));
4568 
4569  if (close(fd) != 0)
4570  ereport(PANIC,
4572  errmsg("could not close file \"%s\": %m",
4573  XLOG_CONTROL_FILE)));
4574 }
4575 
4576 static void
4578 {
4579  pg_crc32c crc;
4580  int fd;
4581  static char wal_segsz_str[20];
4582  int r;
4583 
4584  /*
4585  * Read data...
4586  */
4588  O_RDWR | PG_BINARY);
4589  if (fd < 0)
4590  ereport(PANIC,
4592  errmsg("could not open file \"%s\": %m",
4593  XLOG_CONTROL_FILE)));
4594 
4596  r = read(fd, ControlFile, sizeof(ControlFileData));
4597  if (r != sizeof(ControlFileData))
4598  {
4599  if (r < 0)
4600  ereport(PANIC,
4602  errmsg("could not read file \"%s\": %m",
4603  XLOG_CONTROL_FILE)));
4604  else
4605  ereport(PANIC,
4607  errmsg("could not read file \"%s\": read %d of %zu",
4608  XLOG_CONTROL_FILE, r, sizeof(ControlFileData))));
4609  }
4611 
4612  close(fd);
4613 
4614  /*
4615  * Check for expected pg_control format version. If this is wrong, the
4616  * CRC check will likely fail because we'll be checking the wrong number
4617  * of bytes. Complaining about wrong version will probably be more
4618  * enlightening than complaining about wrong CRC.
4619  */
4620 
4621  if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
4622  ereport(FATAL,
4623  (errmsg("database files are incompatible with server"),
4624  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
4625  " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
4626  ControlFile->pg_control_version, ControlFile->pg_control_version,
4628  errhint("This could be a problem of mismatched byte ordering. It looks like you need to initdb.")));
4629 
4630  if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
4631  ereport(FATAL,
4632  (errmsg("database files are incompatible with server"),
4633  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
4634  " but the server was compiled with PG_CONTROL_VERSION %d.",
4635  ControlFile->pg_control_version, PG_CONTROL_VERSION),
4636  errhint("It looks like you need to initdb.")));
4637 
4638  /* Now check the CRC. */
4639  INIT_CRC32C(crc);
4640  COMP_CRC32C(crc,
4641  (char *) ControlFile,
4642  offsetof(ControlFileData, crc));
4643  FIN_CRC32C(crc);
4644 
4645  if (!EQ_CRC32C(crc, ControlFile->crc))
4646  ereport(FATAL,
4647  (errmsg("incorrect checksum in control file")));
4648 
4649  /*
4650  * Do compatibility checking immediately. If the database isn't
4651  * compatible with the backend executable, we want to abort before we can
4652  * possibly do any damage.
4653  */
4654  if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
4655  ereport(FATAL,
4656  (errmsg("database files are incompatible with server"),
4657  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
4658  " but the server was compiled with CATALOG_VERSION_NO %d.",
4659  ControlFile->catalog_version_no, CATALOG_VERSION_NO),
4660  errhint("It looks like you need to initdb.")));
4661  if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
4662  ereport(FATAL,
4663  (errmsg("database files are incompatible with server"),
4664  errdetail("The database cluster was initialized with MAXALIGN %d,"
4665  " but the server was compiled with MAXALIGN %d.",
4666  ControlFile->maxAlign, MAXIMUM_ALIGNOF),
4667  errhint("It looks like you need to initdb.")));
4668  if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
4669  ereport(FATAL,
4670  (errmsg("database files are incompatible with server"),
4671  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
4672  errhint("It looks like you need to initdb.")));
4673  if (ControlFile->blcksz != BLCKSZ)
4674  ereport(FATAL,
4675  (errmsg("database files are incompatible with server"),
4676  errdetail("The database cluster was initialized with BLCKSZ %d,"
4677  " but the server was compiled with BLCKSZ %d.",
4678  ControlFile->blcksz, BLCKSZ),
4679  errhint("It looks like you need to recompile or initdb.")));
4680  if (ControlFile->relseg_size != RELSEG_SIZE)
4681  ereport(FATAL,
4682  (errmsg("database files are incompatible with server"),
4683  errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
4684  " but the server was compiled with RELSEG_SIZE %d.",
4685  ControlFile->relseg_size, RELSEG_SIZE),
4686  errhint("It looks like you need to recompile or initdb.")));
4687  if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
4688  ereport(FATAL,
4689  (errmsg("database files are incompatible with server"),
4690  errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
4691  " but the server was compiled with XLOG_BLCKSZ %d.",
4692  ControlFile->xlog_blcksz, XLOG_BLCKSZ),
4693  errhint("It looks like you need to recompile or initdb.")));
4694  if (ControlFile->nameDataLen != NAMEDATALEN)
4695  ereport(FATAL,
4696  (errmsg("database files are incompatible with server"),
4697  errdetail("The database cluster was initialized with NAMEDATALEN %d,"
4698  " but the server was compiled with NAMEDATALEN %d.",
4699  ControlFile->nameDataLen, NAMEDATALEN),
4700  errhint("It looks like you need to recompile or initdb.")));
4701  if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
4702  ereport(FATAL,
4703  (errmsg("database files are incompatible with server"),
4704  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
4705  " but the server was compiled with INDEX_MAX_KEYS %d.",
4706  ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
4707  errhint("It looks like you need to recompile or initdb.")));
4708  if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
4709  ereport(FATAL,
4710  (errmsg("database files are incompatible with server"),
4711  errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
4712  " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
4713  ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
4714  errhint("It looks like you need to recompile or initdb.")));
4715  if (ControlFile->loblksize != LOBLKSIZE)
4716  ereport(FATAL,
4717  (errmsg("database files are incompatible with server"),
4718  errdetail("The database cluster was initialized with LOBLKSIZE %d,"
4719  " but the server was compiled with LOBLKSIZE %d.",
4720  ControlFile->loblksize, (int) LOBLKSIZE),
4721  errhint("It looks like you need to recompile or initdb.")));
4722 
4723 #ifdef USE_FLOAT4_BYVAL
4724  if (ControlFile->float4ByVal != true)
4725  ereport(FATAL,
4726  (errmsg("database files are incompatible with server"),
4727  errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
4728  " but the server was compiled with USE_FLOAT4_BYVAL."),
4729  errhint("It looks like you need to recompile or initdb.")));
4730 #else
4731  if (ControlFile->float4ByVal != false)
4732  ereport(FATAL,
4733  (errmsg("database files are incompatible with server"),
4734  errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
4735  " but the server was compiled without USE_FLOAT4_BYVAL."),
4736  errhint("It looks like you need to recompile or initdb.")));
4737 #endif
4738 
4739 #ifdef USE_FLOAT8_BYVAL
4740  if (ControlFile->float8ByVal != true)
4741  ereport(FATAL,
4742  (errmsg("database files are incompatible with server"),
4743  errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
4744  " but the server was compiled with USE_FLOAT8_BYVAL."),
4745  errhint("It looks like you need to recompile or initdb.")));
4746 #else
4747  if (ControlFile->float8ByVal != false)
4748  ereport(FATAL,
4749  (errmsg("database files are incompatible with server"),
4750  errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
4751  " but the server was compiled without USE_FLOAT8_BYVAL."),
4752  errhint("It looks like you need to recompile or initdb.")));
4753 #endif
4754 
4755  wal_segment_size = ControlFile->xlog_seg_size;
4756 
4758  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4759  errmsg_plural("WAL segment size must be a power of two between 1 MB and 1 GB, but the control file specifies %d byte",
4760  "WAL segment size must be a power of two between 1 MB and 1 GB, but the control file specifies %d bytes",
4762  wal_segment_size)));
4763 
4764  snprintf(wal_segsz_str, sizeof(wal_segsz_str), "%d", wal_segment_size);
4765  SetConfigOption("wal_segment_size", wal_segsz_str, PGC_INTERNAL,
4766  PGC_S_OVERRIDE);
4767 
4768  /* check and update variables dependent on wal_segment_size */
4770  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4771  errmsg("\"min_wal_size\" must be at least twice \"wal_segment_size\"")));
4772 
4774  ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4775  errmsg("\"max_wal_size\" must be at least twice \"wal_segment_size\"")));
4776 
4778  (wal_segment_size / XLOG_BLCKSZ * UsableBytesInPage) -
4780 
4782 
4783  /* Make the initdb settings visible as GUC variables, too */
4784  SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
4786 }
4787 
4788 /*
4789  * Utility wrapper to update the control file. Note that the control
4790  * file gets flushed.
4791  */
4792 void
4794 {
4795  update_controlfile(DataDir, ControlFile, true);
4796 }
4797 
4798 /*
4799  * Returns the unique system identifier from control file.
4800  */
4801 uint64
4803 {
4804  Assert(ControlFile != NULL);
4805  return ControlFile->system_identifier;
4806 }
4807 
4808 /*
4809  * Returns the random nonce from control file.
4810  */
4811 char *
4813 {
4814  Assert(ControlFile != NULL);
4815  return ControlFile->mock_authentication_nonce;
4816 }
4817 
4818 /*
4819  * Are checksums enabled for data pages?
4820  */
4821 bool
4823 {
4824  Assert(ControlFile != NULL);
4825  return (ControlFile->data_checksum_version > 0);
4826 }
4827 
4828 /*
4829  * Returns a fake LSN for unlogged relations.
4830  *
4831  * Each call generates an LSN that is greater than any previous value
4832  * returned. The current counter value is saved and restored across clean
4833  * shutdowns, but like unlogged relations, does not survive a crash. This can
4834  * be used in lieu of real LSN values returned by XLogInsert, if you need an
4835  * LSN-like increasing sequence of numbers without writing any WAL.
4836  */
4837 XLogRecPtr
4839 {
4840  XLogRecPtr nextUnloggedLSN;
4841 
4842  /* increment the unloggedLSN counter, need SpinLock */
4843  SpinLockAcquire(&XLogCtl->ulsn_lck);
4844  nextUnloggedLSN = XLogCtl->unloggedLSN++;
4845  SpinLockRelease(&XLogCtl->ulsn_lck);
4846 
4847  return nextUnloggedLSN;
4848 }
4849 
4850 /*
4851  * Auto-tune the number of XLOG buffers.
4852  *
4853  * The preferred setting for wal_buffers is about 3% of shared_buffers, with
4854  * a maximum of one XLOG segment (there is little reason to think that more
4855  * is helpful, at least so long as we force an fsync when switching log files)
4856  * and a minimum of 8 blocks (which was the default value prior to PostgreSQL
4857  * 9.1, when auto-tuning was added).
4858  *
4859  * This should not be called until NBuffers has received its final value.
4860  */
4861 static int
4863 {
4864  int xbuffers;
4865 
4866  xbuffers = NBuffers / 32;
4867  if (xbuffers > (wal_segment_size / XLOG_BLCKSZ))
4868  xbuffers = (wal_segment_size / XLOG_BLCKSZ);
4869  if (xbuffers < 8)
4870  xbuffers = 8;
4871  return xbuffers;
4872 }
4873 
4874 /*
4875  * GUC check_hook for wal_buffers
4876  */
4877 bool
4878 check_wal_buffers(int *newval, void **extra, GucSource source)
4879 {
4880  /*
4881  * -1 indicates a request for auto-tune.
4882  */
4883  if (*newval == -1)
4884  {
4885  /*
4886  * If we haven't yet changed the boot_val default of -1, just let it
4887  * be. We'll fix it when XLOGShmemSize is called.
4888  */
4889  if (XLOGbuffers == -1)
4890  return true;
4891 
4892  /* Otherwise, substitute the auto-tune value */
4893  *newval = XLOGChooseNumBuffers();
4894  }
4895 
4896  /*
4897  * We clamp manually-set values to at least 4 blocks. Prior to PostgreSQL
4898  * 9.1, a minimum of 4 was enforced by guc.c, but since that is no longer
4899  * the case, we just silently treat such values as a request for the
4900  * minimum. (We could throw an error instead, but that doesn't seem very
4901  * helpful.)
4902  */
4903  if (*newval < 4)
4904  *newval = 4;
4905 
4906  return true;
4907 }
4908 
4909 /*
4910  * Read the control file, set respective GUCs.
4911  *
4912  * This is to be called during startup, including a crash recovery cycle,
4913  * unless in bootstrap mode, where no control file yet exists. As there's no
4914  * usable shared memory yet (its sizing can depend on the contents of the
4915  * control file!), first store the contents in local memory. XLOGShmemInit()
4916  * will then copy it to shared memory later.
4917  *
4918  * reset just controls whether previous contents are to be expected (in the
4919  * reset case, there's a dangling pointer into old shared memory), or not.
4920  */
4921 void
4923 {
4924  Assert(reset || ControlFile == NULL);
4925  ControlFile = palloc(sizeof(ControlFileData));
4926  ReadControlFile();
4927 }
4928 
4929 /*
4930  * Initialization of shared memory for XLOG
4931  */
4932 Size
4934 {
4935  Size size;
4936 
4937  /*
4938  * If the value of wal_buffers is -1, use the preferred auto-tune value.
4939  * This isn't an amazingly clean place to do this, but we must wait till
4940  * NBuffers has received its final value, and must do it before using the
4941  * value of XLOGbuffers to do anything important.
4942  */
4943  if (XLOGbuffers == -1)
4944  {
4945  char buf[32];
4946 
4947  snprintf(buf, sizeof(buf), "%d", XLOGChooseNumBuffers());
4948  SetConfigOption("wal_buffers", buf, PGC_POSTMASTER, PGC_S_OVERRIDE);
4949  }
4950  Assert(XLOGbuffers > 0);
4951 
4952  /* XLogCtl */
4953  size = sizeof(XLogCtlData);
4954 
4955  /* WAL insertion locks, plus alignment */
4956  size = add_size(size, mul_size(sizeof(WALInsertLockPadded), NUM_XLOGINSERT_LOCKS + 1));
4957  /* xlblocks array */
4958  size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4959  /* extra alignment padding for XLOG I/O buffers */
4960  size = add_size(size, XLOG_BLCKSZ);
4961  /* and the buffers themselves */
4962  size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4963 
4964  /*
4965  * Note: we don't count ControlFileData, it comes out of the "slop factor"
4966  * added by CreateSharedMemoryAndSemaphores. This lets us use this
4967  * routine again below to compute the actual allocation size.
4968  */
4969 
4970  return size;
4971 }
4972 
4973 void
4975 {
4976  bool foundCFile,
4977  foundXLog;
4978  char *allocptr;
4979  int i;
4980  ControlFileData *localControlFile;
4981 
4982 #ifdef WAL_DEBUG
4983 
4984  /*
4985  * Create a memory context for WAL debugging that's exempt from the normal
4986  * "no pallocs in critical section" rule. Yes, that can lead to a PANIC if
4987  * an allocation fails, but wal_debug is not for production use anyway.
4988  */
4989  if (walDebugCxt == NULL)
4990  {
4992  "WAL Debug",
4994  MemoryContextAllowInCriticalSection(walDebugCxt, true);
4995  }
4996 #endif
4997 
4998 
4999  XLogCtl = (XLogCtlData *)
5000  ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
5001 
5002  localControlFile = ControlFile;
5003  ControlFile = (ControlFileData *)
5004  ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
5005 
5006  if (foundCFile || foundXLog)
5007  {
5008  /* both should be present or neither */
5009  Assert(foundCFile && foundXLog);
5010 
5011  /* Initialize local copy of WALInsertLocks and register the tranche */
5012  WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
5014  "wal_insert");
5015 
5016  if (localControlFile)
5017  pfree(localControlFile);
5018  return;
5019  }
5020  memset(XLogCtl, 0, sizeof(XLogCtlData));
5021 
5022  /*
5023  * Already have read control file locally, unless in bootstrap mode. Move
5024  * contents into shared memory.
5025  */
5026  if (localControlFile)
5027  {
5028  memcpy(ControlFile, localControlFile, sizeof(ControlFileData));
5029  pfree(localControlFile);
5030  }
5031 
5032  /*
5033  * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
5034  * multiple of the alignment for same, so no extra alignment padding is
5035  * needed here.
5036  */
5037  allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
5038  XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
5039  memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
5040  allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
5041 
5042 
5043  /* WAL insertion locks. Ensure they're aligned to the full padded size */
5044  allocptr += sizeof(WALInsertLockPadded) -
5045  ((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
5046  WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
5047  (WALInsertLockPadded *) allocptr;
5048  allocptr += sizeof(WALInsertLockPadded) * NUM_XLOGINSERT_LOCKS;
5049 
5051  for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
5052  {
5053  LWLockInitialize(&WALInsertLocks[i].l.lock, LWTRANCHE_WAL_INSERT);
5054  WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
5055  WALInsertLocks[i].l.lastImportantAt = InvalidXLogRecPtr;
5056  }
5057 
5058  /*
5059  * Align the start of the page buffers to a full xlog block size boundary.
5060  * This simplifies some calculations in XLOG insertion. It is also
5061  * required for O_DIRECT.
5062  */
5063  allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr);
5064  XLogCtl->pages = allocptr;
5065  memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
5066 
5067  /*
5068  * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
5069  * in additional info.)
5070  */
5071  XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
5072  XLogCtl->SharedRecoveryInProgress = true;
5073  XLogCtl->SharedHotStandbyActive = false;
5074  XLogCtl->WalWriterSleeping = false;
5075 
5076  SpinLockInit(&XLogCtl->Insert.insertpos_lck);
5077  SpinLockInit(&XLogCtl->info_lck);
5078  SpinLockInit(&XLogCtl->ulsn_lck);
5080 }
5081 
5082 /*
5083  * This func must be called ONCE on system install. It creates pg_control
5084  * and the initial XLOG segment.
5085  */
5086 void
5088 {
5089  CheckPoint checkPoint;
5090  char *buffer;
5091  XLogPageHeader page;
5092  XLogLongPageHeader longpage;
5093  XLogRecord *record;
5094  char *recptr;
5095  bool use_existent;
5096  uint64 sysidentifier;
5097  char mock_auth_nonce[MOCK_AUTH_NONCE_LEN];
5098  struct timeval tv;
5099  pg_crc32c crc;
5100 
5101  /*
5102  * Select a hopefully-unique system identifier code for this installation.
5103  * We use the result of gettimeofday(), including the fractional seconds
5104  * field, as being about as unique as we can easily get. (Think not to
5105  * use random(), since it hasn't been seeded and there's no portable way
5106  * to seed it other than the system clock value...) The upper half of the
5107  * uint64 value is just the tv_sec part, while the lower half contains the
5108  * tv_usec part (which must fit in 20 bits), plus 12 bits from our current
5109  * PID for a little extra uniqueness. A person knowing this encoding can
5110  * determine the initialization time of the installation, which could
5111  * perhaps be useful sometimes.
5112  */
5113  gettimeofday(&tv, NULL);
5114  sysidentifier = ((uint64) tv.tv_sec) << 32;
5115  sysidentifier |= ((uint64) tv.tv_usec) << 12;
5116  sysidentifier |= getpid() & 0xFFF;
5117 
5118  /*
5119  * Generate a random nonce. This is used for authentication requests that
5120  * will fail because the user does not exist. The nonce is used to create
5121  * a genuine-looking password challenge for the non-existent user, in lieu
5122  * of an actual stored password.
5123  */
5124  if (!pg_strong_random(mock_auth_nonce, MOCK_AUTH_NONCE_LEN))
5125  ereport(PANIC,
5126  (errcode(ERRCODE_INTERNAL_ERROR),
5127  errmsg("could not generate secret authorization token")));
5128 
5129  /* First timeline ID is always 1 */
5130  ThisTimeLineID = 1;
5131 
5132  /* page buffer must be aligned suitably for O_DIRECT */
5133  buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ);
5134  page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer);
5135  memset(page, 0, XLOG_BLCKSZ);
5136 
5137  /*
5138  * Set up information for the initial checkpoint record
5139  *
5140  * The initial checkpoint record is written to the beginning of the WAL
5141  * segment with logid=0 logseg=1. The very first WAL segment, 0/0, is not
5142  * used, so that we can use 0/0 to mean "before any valid WAL segment".
5143  */
5144  checkPoint.redo = wal_segment_size + SizeOfXLogLongPHD;
5145  checkPoint.ThisTimeLineID = ThisTimeLineID;
5146  checkPoint.PrevTimeLineID = ThisTimeLineID;
5147  checkPoint.fullPageWrites = fullPageWrites;
5148  checkPoint.nextFullXid =
5150  checkPoint.nextOid = FirstBootstrapObjectId;
5151  checkPoint.nextMulti = FirstMultiXactId;
5152  checkPoint.nextMultiOffset = 0;
5153  checkPoint.oldestXid = FirstNormalTransactionId;
5154  checkPoint.oldestXidDB = TemplateDbOid;
5155  checkPoint.oldestMulti = FirstMultiXactId;
5156  checkPoint.oldestMultiDB = TemplateDbOid;
5159  checkPoint.time = (pg_time_t) time(NULL);
5161 
5163  ShmemVariableCache->nextOid = checkPoint.nextOid;
5165  MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
5166  AdvanceOldestClogXid(checkPoint.oldestXid);
5167  SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
5168  SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB, true);
5170 
5171  /* Set up the XLOG page header */
5172  page->xlp_magic = XLOG_PAGE_MAGIC;
5173  page->xlp_info = XLP_LONG_HEADER;
5174  page->xlp_tli = ThisTimeLineID;
5176  longpage = (XLogLongPageHeader) page;
5177  longpage->xlp_sysid = sysidentifier;
5178  longpage->xlp_seg_size = wal_segment_size;
5179  longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
5180 
5181  /* Insert the initial checkpoint record */
5182  recptr = ((char *) page + SizeOfXLogLongPHD);
5183  record = (XLogRecord *) recptr;
5184  record->xl_prev = 0;
5185  record->xl_xid = InvalidTransactionId;
5186  record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint);
5188  record->xl_rmid = RM_XLOG_ID;
5189  recptr += SizeOfXLogRecord;
5190  /* fill the XLogRecordDataHeaderShort struct */
5191  *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT;
5192  *(recptr++) = sizeof(checkPoint);
5193  memcpy(recptr, &checkPoint, sizeof(checkPoint));
5194  recptr += sizeof(checkPoint);
5195  Assert(recptr - (char *) record == record->xl_tot_len);
5196 
5197  INIT_CRC32C(crc);
5198  COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
5199  COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
5200  FIN_CRC32C(crc);
5201  record->xl_crc = crc;
5202 
5203  /* Create first XLOG segment file */
5204  use_existent = false;
5205  openLogFile = XLogFileInit(1, &use_existent, false);
5206 
5207  /* Write the first page with the initial record */
5208  errno = 0;
5210  if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
5211  {
5212  /* if write didn't set errno, assume problem is no disk space */
5213  if (errno == 0)
5214  errno = ENOSPC;
5215  ereport(PANIC,
5217  errmsg("could not write bootstrap write-ahead log file: %m")));
5218  }
5220 
5222  if (pg_fsync(openLogFile) != 0)
5223  ereport(PANIC,
5225  errmsg("could not fsync bootstrap write-ahead log file: %m")));
5227 
5228  if (close(openLogFile) != 0)
5229  ereport(PANIC,
5231  errmsg("could not close bootstrap write-ahead log file: %m")));
5232 
5233  openLogFile = -1;
5234 
5235  /* Now create pg_control */
5236 
5237  memset(ControlFile, 0, sizeof(ControlFileData));
5238  /* Initialize pg_control status fields */
5239  ControlFile->system_identifier = sysidentifier;
5240  memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN);
5241  ControlFile->state = DB_SHUTDOWNED;
5242  ControlFile->time = checkPoint.time;
5243  ControlFile->checkPoint = checkPoint.redo;
5244  ControlFile->checkPointCopy = checkPoint;
5245  ControlFile->unloggedLSN = FirstNormalUnloggedLSN;
5246 
5247  /* Set important parameter values for use when replaying WAL */
5248  ControlFile->MaxConnections = MaxConnections;
5250  ControlFile->max_wal_senders = max_wal_senders;
5251  ControlFile->max_prepared_xacts = max_prepared_xacts;
5252  ControlFile->max_locks_per_xact = max_locks_per_xact;
5253  ControlFile->wal_level = wal_level;
5254  ControlFile->wal_log_hints = wal_log_hints;
5257 
5258  /* some additional ControlFile fields are set in WriteControlFile() */
5259 
5260  WriteControlFile();
5261 
5262  /* Bootstrap the commit log, too */
5263  BootStrapCLOG();
5267 
5268  pfree(buffer);
5269 
5270  /*
5271  * Force control file to be read - in contrast to normal processing we'd
5272  * otherwise never run the checks and GUC related initializations therein.
5273  */
5274  ReadControlFile();
5275 }
5276 
5277 static char *
5279 {
5280  static char buf[128];
5281 
5282  pg_strftime(buf, sizeof(buf),
5283  "%Y-%m-%d %H:%M:%S %Z",
5284  pg_localtime(&tnow, log_timezone));
5285 
5286  return buf;
5287 }
5288 
5289 /*
5290  * See if there are any recovery signal files and if so, set state for
5291  * recovery.
5292  *
5293  * See if there is a recovery command file (recovery.conf), and if so
5294  * throw an ERROR since as of PG12 we no longer recognize that.
5295  */
5296 static void
5298 {
5299  struct stat stat_buf;
5300 
5302  return;
5303 
5304  /*
5305  * Check for old recovery API file: recovery.conf
5306  */
5307  if (stat(RECOVERY_COMMAND_FILE, &stat_buf) == 0)
5308  ereport(FATAL,
5310  errmsg("using recovery command file \"%s\" is not supported",
5312 
5313  /*
5314  * Remove unused .done file, if present. Ignore if absent.
5315  */
5316  unlink(RECOVERY_COMMAND_DONE);
5317 
5318  /*
5319  * Check for recovery signal files and if found, fsync them since they
5320  * represent server state information. We don't sweat too much about the
5321  * possibility of fsync failure, however.
5322  *
5323  * If present, standby signal file takes precedence. If neither is present
5324  * then we won't enter archive recovery.
5325  */
5326  if (stat(STANDBY_SIGNAL_FILE, &stat_buf) == 0)
5327  {
5328  int fd;
5329 
5331  S_IRUSR | S_IWUSR);
5332  if (fd >= 0)
5333  {
5334  (void) pg_fsync(fd);
5335  close(fd);
5336  }
5338  }
5339  else if (stat(RECOVERY_SIGNAL_FILE, &stat_buf) == 0)
5340  {
5341  int fd;
5342 
5344  S_IRUSR | S_IWUSR);
5345  if (fd >= 0)
5346  {
5347  (void) pg_fsync(fd);
5348  close(fd);
5349  }
5351  }
5352 
5353  StandbyModeRequested = false;
5354  ArchiveRecoveryRequested = false;
5356  {
5357  StandbyModeRequested = true;
5358  ArchiveRecoveryRequested = true;
5359  }
5360  else if (recovery_signal_file_found)
5361  {
5362  StandbyModeRequested = false;
5363  ArchiveRecoveryRequested = true;
5364  }
5365  else
5366  return;
5367 
5368  /*
5369  * We don't support standby mode in standalone backends; that requires
5370  * other processes such as the WAL receiver to be alive.
5371  */
5373  ereport(FATAL,
5374  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
5375  errmsg("standby mode is not supported by single-user servers")));
5376 }
5377 
5378 static void
5380 {
5382  return;
5383 
5384  /*
5385  * Check for compulsory parameters
5386  */
5388  {
5389  if ((PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) &&
5390  (recoveryRestoreCommand == NULL || strcmp(recoveryRestoreCommand, "") == 0))
5391  ereport(WARNING,
5392  (errmsg("specified neither primary_conninfo nor restore_command"),
5393  errhint("The database server will regularly poll the pg_wal subdirectory to check for files placed there.")));
5394  }
5395  else
5396  {
5397  if (recoveryRestoreCommand == NULL ||
5398  strcmp(recoveryRestoreCommand, "") == 0)
5399  ereport(FATAL,
5400  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5401  errmsg("must specify restore_command when standby mode is not enabled")));
5402  }
5403 
5404  /*
5405  * Override any inconsistent requests. Note that this is a change of
5406  * behaviour in 9.5; prior to this we simply ignored a request to pause if
5407  * hot_standby = off, which was surprising behaviour.
5408  */
5412 
5413  /*
5414  * Final parsing of recovery_target_time string; see also
5415  * check_recovery_target_time().
5416  */
5418  {
5422  Int32GetDatum(-1)));
5423  }
5424 
5425  /*
5426  * If user specified recovery_target_timeline, validate it or compute the
5427  * "latest" value. We can't do this until after we've gotten the restore
5428  * command and set InArchiveRecovery, because we need to fetch timeline
5429  * history files from the archive.
5430  */
5432  {
5434 
5435  /* Timeline 1 does not have a history file, all else should */
5436  if (rtli != 1 && !existsTimeLineHistory(rtli))
5437  ereport(FATAL,
5438  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
5439  errmsg("recovery target timeline %u does not exist",
5440  rtli)));
5441  recoveryTargetTLI = rtli;
5442  }
5444  {
5445  /* We start the "latest" search from pg_control's timeline */
5447  }
5448  else
5449  {
5450  /*
5451  * else we just use the recoveryTargetTLI as already read from
5452  * ControlFile
5453  */
5455  }
5456 }
5457 
5458 /*
5459  * Exit archive-recovery state
5460  */
5461 static void
5463 {
5464  char xlogfname[MAXFNAMELEN];
5465  XLogSegNo endLogSegNo;
5466  XLogSegNo startLogSegNo;
5467 
5468  /* we always switch to a new timeline after archive recovery */
5469  Assert(endTLI != ThisTimeLineID);
5470 
5471  /*
5472  * We are no longer in archive recovery state.
5473  */
5474  InArchiveRecovery = false;
5475 
5476  /*
5477  * Update min recovery point one last time.
5478  */
5480 
5481  /*
5482  * If the ending log segment is still open, close it (to avoid problems on
5483  * Windows with trying to rename or delete an open file).
5484  */
5485  if (readFile >= 0)
5486  {
5487  close(readFile);
5488  readFile = -1;
5489  }
5490 
5491  /*
5492  * Calculate the last segment on the old timeline, and the first segment
5493  * on the new timeline. If the switch happens in the middle of a segment,
5494  * they are the same, but if the switch happens exactly at a segment
5495  * boundary, startLogSegNo will be endLogSegNo + 1.
5496  */
5497  XLByteToPrevSeg(endOfLog, endLogSegNo, wal_segment_size);
5498  XLByteToSeg(endOfLog, startLogSegNo, wal_segment_size);
5499 
5500  /*
5501  * Initialize the starting WAL segment for the new timeline. If the switch
5502  * happens in the middle of a segment, copy data from the last WAL segment
5503  * of the old timeline up to the switch point, to the starting WAL segment
5504  * on the new timeline.
5505  */
5506  if (endLogSegNo == startLogSegNo)
5507  {
5508  /*
5509  * Make a copy of the file on the new timeline.
5510  *
5511  * Writing WAL isn't allowed yet, so there are no locking
5512  * considerations. But we should be just as tense as XLogFileInit to
5513  * avoid emplacing a bogus file.
5514  */
5515  XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
5516  XLogSegmentOffset(endOfLog, wal_segment_size));
5517  }
5518  else
5519  {
5520  /*
5521  * The switch happened at a segment boundary, so just create the next
5522  * segment on the new timeline.
5523  */
5524  bool use_existent = true;
5525  int fd;
5526 
5527  fd = XLogFileInit(startLogSegNo, &use_existent, true);
5528 
5529  if (close(fd) != 0)
5530  ereport(ERROR,
5532  errmsg("could not close file \"%s\": %m",
5533  XLogFileNameP(ThisTimeLineID, startLogSegNo))));
5534  }
5535 
5536  /*
5537  * Let's just make real sure there are not .ready or .done flags posted
5538  * for the new segment.
5539  */
5540  XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo, wal_segment_size);
5541  XLogArchiveCleanup(xlogfname);
5542 
5543  /*
5544  * Remove the signal files out of the way, so that we don't accidentally
5545  * re-enter archive recovery mode in a subsequent crash.
5546  */
5549 
5552 
5553  ereport(LOG,
5554  (errmsg("archive recovery complete")));
5555 }
5556 
5557 /*
5558  * Extract timestamp from WAL record.
5559  *
5560  * If the record contains a timestamp, returns true, and saves the timestamp
5561  * in *recordXtime. If the record type has no timestamp, returns false.
5562  * Currently, only transaction commit/abort records and restore points contain
5563  * timestamps.
5564  */
5565 static bool
5567 {
5568  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
5569  uint8 xact_info = info & XLOG_XACT_OPMASK;
5570  uint8 rmid = XLogRecGetRmid(record);
5571 
5572  if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
5573  {
5574  *recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
5575  return true;
5576  }
5577  if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
5578  xact_info == XLOG_XACT_COMMIT_PREPARED))
5579  {
5580  *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
5581  return true;
5582  }
5583  if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
5584  xact_info == XLOG_XACT_ABORT_PREPARED))
5585  {
5586  *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
5587  return true;
5588  }
5589  return false;
5590 }
5591 
5592 /*
5593  * For point-in-time recovery, this function decides whether we want to
5594  * stop applying the XLOG before the current record.
5595  *
5596  * Returns true if we are stopping, false otherwise. If stopping, some
5597  * information is saved in recoveryStopXid et al for use in annotating the
5598  * new timeline's history file.
5599  */
5600 static bool
5602 {
5603  bool stopsHere = false;
5604  uint8 xact_info;
5605  bool isCommit;
5606  TimestampTz recordXtime = 0;
5607  TransactionId recordXid;
5608 
5609  /*
5610  * Ignore recovery target settings when not in archive recovery (meaning
5611  * we are in crash recovery).
5612  */
5614  return false;
5615 
5616  /* Check if we should stop as soon as reaching consistency */
5618  {
5619  ereport(LOG,
5620  (errmsg("recovery stopping after reaching consistency")));
5621 
5622  recoveryStopAfter = false;
5625  recoveryStopTime = 0;
5626  recoveryStopName[0] = '\0';
5627  return true;
5628  }
5629 
5630  /* Check if target LSN has been reached */
5633  record->ReadRecPtr >= recoveryTargetLSN)
5634  {
5635  recoveryStopAfter = false;
5637  recoveryStopLSN = record->ReadRecPtr;
5638  recoveryStopTime = 0;
5639  recoveryStopName[0] = '\0';
5640  ereport(LOG,
5641  (errmsg("recovery stopping before WAL location (LSN) \"%X/%X\"",
5642  (uint32) (recoveryStopLSN >> 32),
5643  (uint32) recoveryStopLSN)));
5644  return true;
5645  }
5646 
5647  /* Otherwise we only consider stopping before COMMIT or ABORT records. */
5648  if (XLogRecGetRmid(record) != RM_XACT_ID)
5649  return false;
5650 
5651  xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
5652 
5653  if (xact_info == XLOG_XACT_COMMIT)
5654  {
5655  isCommit = true;
5656  recordXid = XLogRecGetXid(record);
5657  }
5658  else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
5659  {
5660  xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
5661  xl_xact_parsed_commit parsed;
5662 
5663  isCommit = true;
5665  xlrec,
5666  &parsed);
5667  recordXid = parsed.twophase_xid;
5668  }
5669  else if (xact_info == XLOG_XACT_ABORT)
5670  {
5671  isCommit = false;
5672  recordXid = XLogRecGetXid(record);
5673  }
5674  else if (xact_info == XLOG_XACT_ABORT_PREPARED)
5675  {
5676  xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
5677  xl_xact_parsed_abort parsed;
5678 
5679  isCommit = true;
5681  xlrec,
5682  &parsed);
5683  recordXid = parsed.twophase_xid;
5684  }
5685  else
5686  return false;
5687 
5689  {
5690  /*
5691  * There can be only one transaction end record with this exact
5692  * transactionid
5693  *
5694  * when testing for an xid, we MUST test for equality only, since
5695  * transactions are numbered in the order they start, not the order
5696  * they complete. A higher numbered xid will complete before you about
5697  * 50% of the time...
5698  */
5699  stopsHere = (recordXid == recoveryTargetXid);
5700  }
5701 
5703  getRecordTimestamp(record, &recordXtime))
5704  {
5705  /*
5706  * There can be many transactions that share the same commit time, so
5707  * we stop after the last one, if we are inclusive, or stop at the
5708  * first one if we are exclusive
5709  */
5711  stopsHere = (recordXtime > recoveryTargetTime);
5712  else
5713  stopsHere = (recordXtime >= recoveryTargetTime);
5714  }
5715 
5716  if (stopsHere)
5717  {
5718  recoveryStopAfter = false;
5719  recoveryStopXid = recordXid;
5720  recoveryStopTime = recordXtime;
5722  recoveryStopName[0] = '\0';
5723 
5724  if (isCommit)
5725  {
5726  ereport(LOG,
5727  (errmsg("recovery stopping before commit of transaction %u, time %s",
5730  }
5731  else
5732  {
5733  ereport(LOG,
5734  (errmsg("recovery stopping before abort of transaction %u, time %s",
5737  }
5738  }
5739 
5740  return stopsHere;
5741 }
5742 
5743 /*
5744  * Same as recoveryStopsBefore, but called after applying the record.
5745  *
5746  * We also track the timestamp of the latest applied COMMIT/ABORT
5747  * record in XLogCtl->recoveryLastXTime.
5748  */
5749 static bool
5751 {
5752  uint8 info;
5753  uint8 xact_info;
5754  uint8 rmid;
5755  TimestampTz recordXtime;
5756 
5757  /*
5758  * Ignore recovery target settings when not in archive recovery (meaning
5759  * we are in crash recovery).
5760  */
5762  return false;
5763 
5764  info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;