PostgreSQL Source Code  git master
xloginsert.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  * Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 
20 #include "postgres.h"
21 
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "executor/instrument.h"
29 #include "miscadmin.h"
30 #include "pg_trace.h"
31 #include "replication/origin.h"
32 #include "storage/bufmgr.h"
33 #include "storage/proc.h"
34 #include "utils/memutils.h"
35 
36 /* Buffer size required to store a compressed version of backup block image */
37 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
38 
39 /*
40  * For each block reference registered with XLogRegisterBuffer, we fill in
41  * a registered_buffer struct.
42  */
43 typedef struct
44 {
45  bool in_use; /* is this slot in use? */
46  uint8 flags; /* REGBUF_* flags */
47  RelFileNode rnode; /* identifies the relation and block */
50  Page page; /* page content */
51  uint32 rdata_len; /* total length of data in rdata chain */
52  XLogRecData *rdata_head; /* head of the chain of data registered with
53  * this block */
54  XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
55  * empty */
56 
57  XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
58  * backup block data in XLogRecordAssemble() */
59 
60  /* buffer to store a compressed version of backup block image */
61  char compressed_page[PGLZ_MAX_BLCKSZ];
63 
65 static int max_registered_buffers; /* allocated size */
66 static int max_registered_block_id = 0; /* highest block_id + 1 currently
67  * registered */
68 
69 /*
70  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
71  * with XLogRegisterData(...).
72  */
74 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
75 static uint32 mainrdata_len; /* total # of bytes in chain */
76 
77 /* flags for the in-progress insertion */
79 
80 /*
81  * These are used to hold the record header while constructing a record.
82  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
83  * because we want it to be MAXALIGNed and padding bytes zeroed.
84  *
85  * For simplicity, it's allocated large enough to hold the headers for any
86  * WAL record.
87  */
89 static char *hdr_scratch = NULL;
90 
91 #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
92 
93 #define HEADER_SCRATCH_SIZE \
94  (SizeOfXLogRecord + \
95  MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
96  SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
97 
98 /*
99  * An array of XLogRecData structs, to hold registered data.
100  */
102 static int num_rdatas; /* entries currently used */
103 static int max_rdatas; /* allocated size */
104 
105 static bool begininsert_called = false;
106 
107 /* Memory context to hold the registered buffer and data references. */
109 
110 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
112  XLogRecPtr *fpw_lsn, int *num_fpi);
113 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
114  uint16 hole_length, char *dest, uint16 *dlen);
115 
116 /*
117  * Begin constructing a WAL record. This must be called before the
118  * XLogRegister* functions and XLogInsert().
119  */
120 void
122 {
124  Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
125  Assert(mainrdata_len == 0);
126 
127  /* cross-check on whether we should be here or not */
128  if (!XLogInsertAllowed())
129  elog(ERROR, "cannot make new WAL entries during recovery");
130 
131  if (begininsert_called)
132  elog(ERROR, "XLogBeginInsert was already called");
133 
134  begininsert_called = true;
135 }
136 
137 /*
138  * Ensure that there are enough buffer and data slots in the working area,
139  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
140  * calls.
141  *
142  * There is always space for a small number of buffers and data chunks, enough
143  * for most record types. This function is for the exceptional cases that need
144  * more.
145  */
146 void
147 XLogEnsureRecordSpace(int max_block_id, int ndatas)
148 {
149  int nbuffers;
150 
151  /*
152  * This must be called before entering a critical section, because
153  * allocating memory inside a critical section can fail. repalloc() will
154  * check the same, but better to check it here too so that we fail
155  * consistently even if the arrays happen to be large enough already.
156  */
157  Assert(CritSectionCount == 0);
158 
159  /* the minimum values can't be decreased */
160  if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
161  max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
162  if (ndatas < XLR_NORMAL_RDATAS)
163  ndatas = XLR_NORMAL_RDATAS;
164 
165  if (max_block_id > XLR_MAX_BLOCK_ID)
166  elog(ERROR, "maximum number of WAL record block references exceeded");
167  nbuffers = max_block_id + 1;
168 
169  if (nbuffers > max_registered_buffers)
170  {
171  registered_buffers = (registered_buffer *)
172  repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
173 
174  /*
175  * At least the padding bytes in the structs must be zeroed, because
176  * they are included in WAL data, but initialize it all for tidiness.
177  */
178  MemSet(&registered_buffers[max_registered_buffers], 0,
179  (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
180  max_registered_buffers = nbuffers;
181  }
182 
183  if (ndatas > max_rdatas)
184  {
185  rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
186  max_rdatas = ndatas;
187  }
188 }
189 
190 /*
191  * Reset WAL record construction buffers.
192  */
193 void
195 {
196  int i;
197 
198  for (i = 0; i < max_registered_block_id; i++)
199  registered_buffers[i].in_use = false;
200 
201  num_rdatas = 0;
202  max_registered_block_id = 0;
203  mainrdata_len = 0;
204  mainrdata_last = (XLogRecData *) &mainrdata_head;
205  curinsert_flags = 0;
206  begininsert_called = false;
207 }
208 
209 /*
210  * Register a reference to a buffer with the WAL record being constructed.
211  * This must be called for every page that the WAL-logged operation modifies.
212  */
213 void
214 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
215 {
216  registered_buffer *regbuf;
217 
218  /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
219  Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
221 
222  if (block_id >= max_registered_block_id)
223  {
224  if (block_id >= max_registered_buffers)
225  elog(ERROR, "too many registered buffers");
226  max_registered_block_id = block_id + 1;
227  }
228 
229  regbuf = &registered_buffers[block_id];
230 
231  BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
232  regbuf->page = BufferGetPage(buffer);
233  regbuf->flags = flags;
234  regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
235  regbuf->rdata_len = 0;
236 
237  /*
238  * Check that this page hasn't already been registered with some other
239  * block_id.
240  */
241 #ifdef USE_ASSERT_CHECKING
242  {
243  int i;
244 
245  for (i = 0; i < max_registered_block_id; i++)
246  {
247  registered_buffer *regbuf_old = &registered_buffers[i];
248 
249  if (i == block_id || !regbuf_old->in_use)
250  continue;
251 
252  Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
253  regbuf_old->forkno != regbuf->forkno ||
254  regbuf_old->block != regbuf->block);
255  }
256  }
257 #endif
258 
259  regbuf->in_use = true;
260 }
261 
262 /*
263  * Like XLogRegisterBuffer, but for registering a block that's not in the
264  * shared buffer pool (i.e. when you don't have a Buffer for it).
265  */
266 void
267 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
268  BlockNumber blknum, Page page, uint8 flags)
269 {
270  registered_buffer *regbuf;
271 
272  /* This is currently only used to WAL-log a full-page image of a page */
273  Assert(flags & REGBUF_FORCE_IMAGE);
275 
276  if (block_id >= max_registered_block_id)
277  max_registered_block_id = block_id + 1;
278 
279  if (block_id >= max_registered_buffers)
280  elog(ERROR, "too many registered buffers");
281 
282  regbuf = &registered_buffers[block_id];
283 
284  regbuf->rnode = *rnode;
285  regbuf->forkno = forknum;
286  regbuf->block = blknum;
287  regbuf->page = page;
288  regbuf->flags = flags;
289  regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
290  regbuf->rdata_len = 0;
291 
292  /*
293  * Check that this page hasn't already been registered with some other
294  * block_id.
295  */
296 #ifdef USE_ASSERT_CHECKING
297  {
298  int i;
299 
300  for (i = 0; i < max_registered_block_id; i++)
301  {
302  registered_buffer *regbuf_old = &registered_buffers[i];
303 
304  if (i == block_id || !regbuf_old->in_use)
305  continue;
306 
307  Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
308  regbuf_old->forkno != regbuf->forkno ||
309  regbuf_old->block != regbuf->block);
310  }
311  }
312 #endif
313 
314  regbuf->in_use = true;
315 }
316 
317 /*
318  * Add data to the WAL record that's being constructed.
319  *
320  * The data is appended to the "main chunk", available at replay with
321  * XLogRecGetData().
322  */
323 void
324 XLogRegisterData(char *data, int len)
325 {
326  XLogRecData *rdata;
327 
329 
330  if (num_rdatas >= max_rdatas)
331  elog(ERROR, "too much WAL data");
332  rdata = &rdatas[num_rdatas++];
333 
334  rdata->data = data;
335  rdata->len = len;
336 
337  /*
338  * we use the mainrdata_last pointer to track the end of the chain, so no
339  * need to clear 'next' here.
340  */
341 
342  mainrdata_last->next = rdata;
343  mainrdata_last = rdata;
344 
345  mainrdata_len += len;
346 }
347 
348 /*
349  * Add buffer-specific data to the WAL record that's being constructed.
350  *
351  * Block_id must reference a block previously registered with
352  * XLogRegisterBuffer(). If this is called more than once for the same
353  * block_id, the data is appended.
354  *
355  * The maximum amount of data that can be registered per block is 65535
356  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
357  * reconstruct the changes to the page, you might as well just log a full
358  * copy of it. (the "main data" that's not associated with a block is not
359  * limited)
360  */
361 void
362 XLogRegisterBufData(uint8 block_id, char *data, int len)
363 {
364  registered_buffer *regbuf;
365  XLogRecData *rdata;
366 
368 
369  /* find the registered buffer struct */
370  regbuf = &registered_buffers[block_id];
371  if (!regbuf->in_use)
372  elog(ERROR, "no block with id %d registered with WAL insertion",
373  block_id);
374 
375  if (num_rdatas >= max_rdatas)
376  elog(ERROR, "too much WAL data");
377  rdata = &rdatas[num_rdatas++];
378 
379  rdata->data = data;
380  rdata->len = len;
381 
382  regbuf->rdata_tail->next = rdata;
383  regbuf->rdata_tail = rdata;
384  regbuf->rdata_len += len;
385 }
386 
387 /*
388  * Set insert status flags for the upcoming WAL record.
389  *
390  * The flags that can be used here are:
391  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
392  * included in the record.
393  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
394  * durability, which allows to avoid triggering WAL archiving and other
395  * background activity.
396  */
397 void
399 {
401  curinsert_flags = flags;
402 }
403 
404 /*
405  * Insert an XLOG record having the specified RMID and info bytes, with the
406  * body of the record being the data and buffer references registered earlier
407  * with XLogRegister* calls.
408  *
409  * Returns XLOG pointer to end of record (beginning of next record).
410  * This can be used as LSN for data pages affected by the logged action.
411  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
412  * before the data page can be written out. This implements the basic
413  * WAL rule "write the log before the data".)
414  */
417 {
418  XLogRecPtr EndPos;
419 
420  /* XLogBeginInsert() must have been called. */
421  if (!begininsert_called)
422  elog(ERROR, "XLogBeginInsert was not called");
423 
424  /*
425  * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
426  * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
427  */
428  if ((info & ~(XLR_RMGR_INFO_MASK |
430  XLR_CHECK_CONSISTENCY)) != 0)
431  elog(PANIC, "invalid xlog info mask %02X", info);
432 
433  TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
434 
435  /*
436  * In bootstrap mode, we don't actually log anything but XLOG resources;
437  * return a phony record pointer.
438  */
439  if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
440  {
442  EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
443  return EndPos;
444  }
445 
446  do
447  {
449  bool doPageWrites;
450  XLogRecPtr fpw_lsn;
451  XLogRecData *rdt;
452  int num_fpi = 0;
453 
454  /*
455  * Get values needed to decide whether to do full-page writes. Since
456  * we don't yet have an insertion lock, these could change under us,
457  * but XLogInsertRecord will recheck them once it has a lock.
458  */
459  GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
460 
461  rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
462  &fpw_lsn, &num_fpi);
463 
464  EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi);
465  } while (EndPos == InvalidXLogRecPtr);
466 
468 
469  return EndPos;
470 }
471 
472 /*
473  * Assemble a WAL record from the registered data and buffers into an
474  * XLogRecData chain, ready for insertion with XLogInsertRecord().
475  *
476  * The record header fields are filled in, except for the xl_prev field. The
477  * calculated CRC does not include the record header yet.
478  *
479  * If there are any registered buffers, and a full-page image was not taken
480  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
481  * signals that the assembled record is only good for insertion on the
482  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
483  */
484 static XLogRecData *
487  XLogRecPtr *fpw_lsn, int *num_fpi)
488 {
489  XLogRecData *rdt;
490  uint32 total_len = 0;
491  int block_id;
492  pg_crc32c rdata_crc;
493  registered_buffer *prev_regbuf = NULL;
494  XLogRecData *rdt_datas_last;
495  XLogRecord *rechdr;
496  char *scratch = hdr_scratch;
497 
498  /*
499  * Note: this function can be called multiple times for the same record.
500  * All the modifications we do to the rdata chains below must handle that.
501  */
502 
503  /* The record begins with the fixed-size header */
504  rechdr = (XLogRecord *) scratch;
505  scratch += SizeOfXLogRecord;
506 
507  hdr_rdt.next = NULL;
508  rdt_datas_last = &hdr_rdt;
509  hdr_rdt.data = hdr_scratch;
510 
511  /*
512  * Enforce consistency checks for this record if user is looking for it.
513  * Do this before at the beginning of this routine to give the possibility
514  * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
515  * a record.
516  */
517  if (wal_consistency_checking[rmid])
518  info |= XLR_CHECK_CONSISTENCY;
519 
520  /*
521  * Make an rdata chain containing all the data portions of all block
522  * references. This includes the data for full-page images. Also append
523  * the headers for the block references in the scratch buffer.
524  */
525  *fpw_lsn = InvalidXLogRecPtr;
526  for (block_id = 0; block_id < max_registered_block_id; block_id++)
527  {
528  registered_buffer *regbuf = &registered_buffers[block_id];
529  bool needs_backup;
530  bool needs_data;
533  XLogRecordBlockCompressHeader cbimg = {0};
534  bool samerel;
535  bool is_compressed = false;
536  bool include_image;
537 
538  if (!regbuf->in_use)
539  continue;
540 
541  /* Determine if this block needs to be backed up */
542  if (regbuf->flags & REGBUF_FORCE_IMAGE)
543  needs_backup = true;
544  else if (regbuf->flags & REGBUF_NO_IMAGE)
545  needs_backup = false;
546  else if (!doPageWrites)
547  needs_backup = false;
548  else
549  {
550  /*
551  * We assume page LSN is first data on *every* page that can be
552  * passed to XLogInsert, whether it has the standard page layout
553  * or not.
554  */
555  XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
556 
557  needs_backup = (page_lsn <= RedoRecPtr);
558  if (!needs_backup)
559  {
560  if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
561  *fpw_lsn = page_lsn;
562  }
563  }
564 
565  /* Determine if the buffer data needs to included */
566  if (regbuf->rdata_len == 0)
567  needs_data = false;
568  else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
569  needs_data = true;
570  else
571  needs_data = !needs_backup;
572 
573  bkpb.id = block_id;
574  bkpb.fork_flags = regbuf->forkno;
575  bkpb.data_length = 0;
576 
577  if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
579 
580  /*
581  * If needs_backup is true or WAL checking is enabled for current
582  * resource manager, log a full-page write for the current block.
583  */
584  include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
585 
586  if (include_image)
587  {
588  Page page = regbuf->page;
589  uint16 compressed_len = 0;
590 
591  /*
592  * The page needs to be backed up, so calculate its hole length
593  * and offset.
594  */
595  if (regbuf->flags & REGBUF_STANDARD)
596  {
597  /* Assume we can omit data between pd_lower and pd_upper */
598  uint16 lower = ((PageHeader) page)->pd_lower;
599  uint16 upper = ((PageHeader) page)->pd_upper;
600 
601  if (lower >= SizeOfPageHeaderData &&
602  upper > lower &&
603  upper <= BLCKSZ)
604  {
605  bimg.hole_offset = lower;
606  cbimg.hole_length = upper - lower;
607  }
608  else
609  {
610  /* No "hole" to remove */
611  bimg.hole_offset = 0;
612  cbimg.hole_length = 0;
613  }
614  }
615  else
616  {
617  /* Not a standard page header, don't try to eliminate "hole" */
618  bimg.hole_offset = 0;
619  cbimg.hole_length = 0;
620  }
621 
622  /*
623  * Try to compress a block image if wal_compression is enabled
624  */
625  if (wal_compression)
626  {
627  is_compressed =
629  cbimg.hole_length,
630  regbuf->compressed_page,
631  &compressed_len);
632  }
633 
634  /*
635  * Fill in the remaining fields in the XLogRecordBlockHeader
636  * struct
637  */
639 
640  /* Report a full page image constructed for the WAL record */
641  *num_fpi += 1;
642 
643  /*
644  * Construct XLogRecData entries for the page content.
645  */
646  rdt_datas_last->next = &regbuf->bkp_rdatas[0];
647  rdt_datas_last = rdt_datas_last->next;
648 
649  bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
650 
651  /*
652  * If WAL consistency checking is enabled for the resource manager
653  * of this WAL record, a full-page image is included in the record
654  * for the block modified. During redo, the full-page is replayed
655  * only if BKPIMAGE_APPLY is set.
656  */
657  if (needs_backup)
658  bimg.bimg_info |= BKPIMAGE_APPLY;
659 
660  if (is_compressed)
661  {
662  bimg.length = compressed_len;
664 
665  rdt_datas_last->data = regbuf->compressed_page;
666  rdt_datas_last->len = compressed_len;
667  }
668  else
669  {
670  bimg.length = BLCKSZ - cbimg.hole_length;
671 
672  if (cbimg.hole_length == 0)
673  {
674  rdt_datas_last->data = page;
675  rdt_datas_last->len = BLCKSZ;
676  }
677  else
678  {
679  /* must skip the hole */
680  rdt_datas_last->data = page;
681  rdt_datas_last->len = bimg.hole_offset;
682 
683  rdt_datas_last->next = &regbuf->bkp_rdatas[1];
684  rdt_datas_last = rdt_datas_last->next;
685 
686  rdt_datas_last->data =
687  page + (bimg.hole_offset + cbimg.hole_length);
688  rdt_datas_last->len =
689  BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
690  }
691  }
692 
693  total_len += bimg.length;
694  }
695 
696  if (needs_data)
697  {
698  /*
699  * Link the caller-supplied rdata chain for this buffer to the
700  * overall list.
701  */
703  bkpb.data_length = regbuf->rdata_len;
704  total_len += regbuf->rdata_len;
705 
706  rdt_datas_last->next = regbuf->rdata_head;
707  rdt_datas_last = regbuf->rdata_tail;
708  }
709 
710  if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
711  {
712  samerel = true;
714  }
715  else
716  samerel = false;
717  prev_regbuf = regbuf;
718 
719  /* Ok, copy the header to the scratch buffer */
720  memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
721  scratch += SizeOfXLogRecordBlockHeader;
722  if (include_image)
723  {
724  memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
726  if (cbimg.hole_length != 0 && is_compressed)
727  {
728  memcpy(scratch, &cbimg,
731  }
732  }
733  if (!samerel)
734  {
735  memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
736  scratch += sizeof(RelFileNode);
737  }
738  memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
739  scratch += sizeof(BlockNumber);
740  }
741 
742  /* followed by the record's origin, if any */
745  {
746  *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
747  memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
748  scratch += sizeof(replorigin_session_origin);
749  }
750 
751  /* followed by main data, if any */
752  if (mainrdata_len > 0)
753  {
754  if (mainrdata_len > 255)
755  {
756  *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
757  memcpy(scratch, &mainrdata_len, sizeof(uint32));
758  scratch += sizeof(uint32);
759  }
760  else
761  {
762  *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
763  *(scratch++) = (uint8) mainrdata_len;
764  }
765  rdt_datas_last->next = mainrdata_head;
766  rdt_datas_last = mainrdata_last;
767  total_len += mainrdata_len;
768  }
769  rdt_datas_last->next = NULL;
770 
771  hdr_rdt.len = (scratch - hdr_scratch);
772  total_len += hdr_rdt.len;
773 
774  /*
775  * Calculate CRC of the data
776  *
777  * Note that the record header isn't added into the CRC initially since we
778  * don't know the prev-link yet. Thus, the CRC will represent the CRC of
779  * the whole record in the order: rdata, then backup blocks, then record
780  * header.
781  */
782  INIT_CRC32C(rdata_crc);
784  for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
785  COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
786 
787  /*
788  * Fill in the fields in the record header. Prev-link is filled in later,
789  * once we know where in the WAL the record will be inserted. The CRC does
790  * not include the record header yet.
791  */
793  rechdr->xl_tot_len = total_len;
794  rechdr->xl_info = info;
795  rechdr->xl_rmid = rmid;
796  rechdr->xl_prev = InvalidXLogRecPtr;
797  rechdr->xl_crc = rdata_crc;
798 
799  return &hdr_rdt;
800 }
801 
802 /*
803  * Create a compressed version of a backup block image.
804  *
805  * Returns false if compression fails (i.e., compressed result is actually
806  * bigger than original). Otherwise, returns true and sets 'dlen' to
807  * the length of compressed block image.
808  */
809 static bool
810 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
811  char *dest, uint16 *dlen)
812 {
813  int32 orig_len = BLCKSZ - hole_length;
814  int32 len;
815  int32 extra_bytes = 0;
816  char *source;
817  PGAlignedBlock tmp;
818 
819  if (hole_length != 0)
820  {
821  /* must skip the hole */
822  source = tmp.data;
823  memcpy(source, page, hole_offset);
824  memcpy(source + hole_offset,
825  page + (hole_offset + hole_length),
826  BLCKSZ - (hole_length + hole_offset));
827 
828  /*
829  * Extra data needs to be stored in WAL record for the compressed
830  * version of block image if the hole exists.
831  */
833  }
834  else
835  source = page;
836 
837  /*
838  * We recheck the actual size even if pglz_compress() reports success and
839  * see if the number of bytes saved by compression is larger than the
840  * length of extra data needed for the compressed version of block image.
841  */
842  len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
843  if (len >= 0 &&
844  len + extra_bytes < orig_len)
845  {
846  *dlen = (uint16) len; /* successful compression */
847  return true;
848  }
849  return false;
850 }
851 
852 /*
853  * Determine whether the buffer referenced has to be backed up.
854  *
855  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
856  * could change later, so the result should be used for optimization purposes
857  * only.
858  */
859 bool
861 {
863  bool doPageWrites;
864  Page page;
865 
866  GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
867 
868  page = BufferGetPage(buffer);
869 
870  if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
871  return true; /* buffer requires backup */
872 
873  return false; /* buffer does not need to be backed up */
874 }
875 
876 /*
877  * Write a backup block if needed when we are setting a hint. Note that
878  * this may be called for a variety of page types, not just heaps.
879  *
880  * Callable while holding just share lock on the buffer content.
881  *
882  * We can't use the plain backup block mechanism since that relies on the
883  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
884  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
885  * failures. So instead we copy the page and insert the copied data as normal
886  * record data.
887  *
888  * We only need to do something if page has not yet been full page written in
889  * this checkpoint round. The LSN of the inserted wal record is returned if we
890  * had to write, InvalidXLogRecPtr otherwise.
891  *
892  * It is possible that multiple concurrent backends could attempt to write WAL
893  * records. In that case, multiple copies of the same block would be recorded
894  * in separate WAL records by different backends, though that is still OK from
895  * a correctness perspective.
896  */
898 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
899 {
900  XLogRecPtr recptr = InvalidXLogRecPtr;
901  XLogRecPtr lsn;
903 
904  /*
905  * Ensure no checkpoint can change our view of RedoRecPtr.
906  */
908 
909  /*
910  * Update RedoRecPtr so that we can make the right decision
911  */
912  RedoRecPtr = GetRedoRecPtr();
913 
914  /*
915  * We assume page LSN is first data on *every* page that can be passed to
916  * XLogInsert, whether it has the standard page layout or not. Since we're
917  * only holding a share-lock on the page, we must take the buffer header
918  * lock when we look at the LSN.
919  */
920  lsn = BufferGetLSNAtomic(buffer);
921 
922  if (lsn <= RedoRecPtr)
923  {
924  int flags;
925  PGAlignedBlock copied_buffer;
926  char *origdata = (char *) BufferGetBlock(buffer);
927  RelFileNode rnode;
928  ForkNumber forkno;
929  BlockNumber blkno;
930 
931  /*
932  * Copy buffer so we don't have to worry about concurrent hint bit or
933  * lsn updates. We assume pd_lower/upper cannot be changed without an
934  * exclusive lock, so the contents bkp are not racy.
935  */
936  if (buffer_std)
937  {
938  /* Assume we can omit data between pd_lower and pd_upper */
939  Page page = BufferGetPage(buffer);
940  uint16 lower = ((PageHeader) page)->pd_lower;
941  uint16 upper = ((PageHeader) page)->pd_upper;
942 
943  memcpy(copied_buffer.data, origdata, lower);
944  memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
945  }
946  else
947  memcpy(copied_buffer.data, origdata, BLCKSZ);
948 
949  XLogBeginInsert();
950 
951  flags = REGBUF_FORCE_IMAGE;
952  if (buffer_std)
953  flags |= REGBUF_STANDARD;
954 
955  BufferGetTag(buffer, &rnode, &forkno, &blkno);
956  XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);
957 
958  recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
959  }
960 
961  return recptr;
962 }
963 
964 /*
965  * Write a WAL record containing a full image of a page. Caller is responsible
966  * for writing the page to disk after calling this routine.
967  *
968  * Note: If you're using this function, you should be building pages in private
969  * memory and writing them directly to smgr. If you're using buffers, call
970  * log_newpage_buffer instead.
971  *
972  * If the page follows the standard page layout, with a PageHeader and unused
973  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
974  * the unused space to be left out from the WAL record, making it smaller.
975  */
978  Page page, bool page_std)
979 {
980  int flags;
981  XLogRecPtr recptr;
982 
983  flags = REGBUF_FORCE_IMAGE;
984  if (page_std)
985  flags |= REGBUF_STANDARD;
986 
987  XLogBeginInsert();
988  XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
989  recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
990 
991  /*
992  * The page may be uninitialized. If so, we can't set the LSN because that
993  * would corrupt the page.
994  */
995  if (!PageIsNew(page))
996  {
997  PageSetLSN(page, recptr);
998  }
999 
1000  return recptr;
1001 }
1002 
1003 /*
1004  * Write a WAL record containing a full image of a page.
1005  *
1006  * Caller should initialize the buffer and mark it dirty before calling this
1007  * function. This function will set the page LSN.
1008  *
1009  * If the page follows the standard page layout, with a PageHeader and unused
1010  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1011  * the unused space to be left out from the WAL record, making it smaller.
1012  */
1013 XLogRecPtr
1014 log_newpage_buffer(Buffer buffer, bool page_std)
1015 {
1016  Page page = BufferGetPage(buffer);
1017  RelFileNode rnode;
1018  ForkNumber forkNum;
1019  BlockNumber blkno;
1020 
1021  /* Shared buffers should be modified in a critical section. */
1022  Assert(CritSectionCount > 0);
1023 
1024  BufferGetTag(buffer, &rnode, &forkNum, &blkno);
1025 
1026  return log_newpage(&rnode, forkNum, blkno, page, page_std);
1027 }
1028 
1029 /*
1030  * WAL-log a range of blocks in a relation.
1031  *
1032  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1033  * written to the WAL. If the range is large, this is done in multiple WAL
1034  * records.
1035  *
1036  * If all page follows the standard page layout, with a PageHeader and unused
1037  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1038  * the unused space to be left out from the WAL records, making them smaller.
1039  *
1040  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1041  * is used on a newly-built relation, and the caller is holding a
1042  * AccessExclusiveLock on it, so no other backend can be accessing it at the
1043  * same time. If that's not the case, you must ensure that this does not
1044  * cause a deadlock through some other means.
1045  */
1046 void
1048  BlockNumber startblk, BlockNumber endblk,
1049  bool page_std)
1050 {
1051  int flags;
1052  BlockNumber blkno;
1053 
1054  flags = REGBUF_FORCE_IMAGE;
1055  if (page_std)
1056  flags |= REGBUF_STANDARD;
1057 
1058  /*
1059  * Iterate over all the pages in the range. They are collected into
1060  * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1061  * for each batch.
1062  */
1064 
1065  blkno = startblk;
1066  while (blkno < endblk)
1067  {
1068  Buffer bufpack[XLR_MAX_BLOCK_ID];
1069  XLogRecPtr recptr;
1070  int nbufs;
1071  int i;
1072 
1074 
1075  /* Collect a batch of blocks. */
1076  nbufs = 0;
1077  while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1078  {
1079  Buffer buf = ReadBufferExtended(rel, forkNum, blkno,
1080  RBM_NORMAL, NULL);
1081 
1083 
1084  /*
1085  * Completely empty pages are not WAL-logged. Writing a WAL record
1086  * would change the LSN, and we don't want that. We want the page
1087  * to stay empty.
1088  */
1089  if (!PageIsNew(BufferGetPage(buf)))
1090  bufpack[nbufs++] = buf;
1091  else
1092  UnlockReleaseBuffer(buf);
1093  blkno++;
1094  }
1095 
1096  /* Write WAL record for this batch. */
1097  XLogBeginInsert();
1098 
1100  for (i = 0; i < nbufs; i++)
1101  {
1102  XLogRegisterBuffer(i, bufpack[i], flags);
1103  MarkBufferDirty(bufpack[i]);
1104  }
1105 
1106  recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
1107 
1108  for (i = 0; i < nbufs; i++)
1109  {
1110  PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1111  UnlockReleaseBuffer(bufpack[i]);
1112  }
1113  END_CRIT_SECTION();
1114  }
1115 }
1116 
1117 /*
1118  * Allocate working buffers needed for WAL record construction.
1119  */
1120 void
1122 {
1123  /* Initialize the working areas */
1124  if (xloginsert_cxt == NULL)
1125  {
1126  xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1127  "WAL record construction",
1129  }
1130 
1131  if (registered_buffers == NULL)
1132  {
1133  registered_buffers = (registered_buffer *)
1134  MemoryContextAllocZero(xloginsert_cxt,
1135  sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1137  }
1138  if (rdatas == NULL)
1139  {
1140  rdatas = MemoryContextAlloc(xloginsert_cxt,
1141  sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1143  }
1144 
1145  /*
1146  * Allocate a buffer to hold the header information for a WAL record.
1147  */
1148  if (hdr_scratch == NULL)
1149  hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1151 }
void XLogRegisterBufData(uint8 block_id, char *data, int len)
Definition: xloginsert.c:362
static XLogRecData hdr_rdt
Definition: xloginsert.c:88
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
XLogRecPtr xl_prev
Definition: xlogrecord.h:45
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
#define BKPIMAGE_HAS_HOLE
Definition: xlogrecord.h:146
XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std)
Definition: xloginsert.c:1014
static uint32 mainrdata_len
Definition: xloginsert.c:75
#define XLR_SPECIAL_REL_UPDATE
Definition: xlogrecord.h:71
#define HEADER_SCRATCH_SIZE
Definition: xloginsert.c:93
Datum lower(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:44
#define XLR_BLOCK_ID_DATA_LONG
Definition: xlogrecord.h:224
void MarkBufferDirty(Buffer buffer)
Definition: bufmgr.c:1468
PGPROC * MyProc
Definition: proc.c:67
void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
Definition: xloginsert.c:214
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define XLR_NORMAL_RDATAS
Definition: xloginsert.h:28
void InitXLogInsert(void)
Definition: xloginsert.c:1121
Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy)
Definition: bufmgr.c:652
XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
Definition: xloginsert.c:898
#define END_CRIT_SECTION()
Definition: miscadmin.h:134
BlockNumber block
Definition: xloginsert.c:49
static bool begininsert_called
Definition: xloginsert.c:105
unsigned char uint8
Definition: c.h:372
#define REGBUF_WILL_INIT
Definition: xloginsert.h:33
#define START_CRIT_SECTION()
Definition: miscadmin.h:132
#define MemSet(start, val, len)
Definition: c.h:978
RmgrId xl_rmid
Definition: xlogrecord.h:47
#define XLOG_INCLUDE_ORIGIN
Definition: xlog.h:238
uint32 BlockNumber
Definition: block.h:31
Datum upper(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:75
XLogRecData * rdata_head
Definition: xloginsert.c:52
#define BUFFER_LOCK_EXCLUSIVE
Definition: bufmgr.h:98
#define SizeOfPageHeaderData
Definition: bufpage.h:216
#define PANIC
Definition: elog.h:53
#define XLR_CHECK_CONSISTENCY
Definition: xlogrecord.h:80
signed int int32
Definition: c.h:362
RelFileNode rnode
Definition: xloginsert.c:47
char data[BLCKSZ]
Definition: c.h:1111
void XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum, BlockNumber blknum, Page page, uint8 flags)
Definition: xloginsert.c:267
unsigned short uint16
Definition: c.h:373
static bool doPageWrites
Definition: xlog.c:383
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3506
#define ERROR
Definition: elog.h:43
bool delayChkpt
Definition: proc.h:152
static int max_registered_buffers
Definition: xloginsert.c:65
void XLogResetInsertion(void)
Definition: xloginsert.c:194
uint32 xl_tot_len
Definition: xlogrecord.h:43
static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn, int *num_fpi)
Definition: xloginsert.c:485
#define SizeOfXLogRecordBlockImageHeader
Definition: xlogrecord.h:142
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static XLogRecPtr RedoRecPtr
Definition: xlog.c:376
#define BKPIMAGE_APPLY
Definition: xlogrecord.h:148
XLogRecPtr BufferGetLSNAtomic(Buffer buffer)
Definition: bufmgr.c:2896
struct RelFileNode RelFileNode
static char * buf
Definition: pg_test_fsync.c:67
static XLogRecData * mainrdata_head
Definition: xloginsert.c:73
#define REGBUF_STANDARD
Definition: xloginsert.h:35
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:440
unsigned int uint32
Definition: c.h:374
void XLogSetRecordFlags(uint8 flags)
Definition: xloginsert.c:398
volatile uint32 CritSectionCount
Definition: globals.c:38
#define BufferGetPage(buffer)
Definition: bufmgr.h:169
#define BKPBLOCK_WILL_INIT
Definition: xlogrecord.h:182
static int max_rdatas
Definition: xloginsert.c:103
XLogRecData * rdata_tail
Definition: xloginsert.c:54
bool XLogInsertAllowed(void)
Definition: xlog.c:8185
MemoryContext TopMemoryContext
Definition: mcxt.c:44
ForkNumber
Definition: relpath.h:40
#define XLR_MAX_BLOCK_ID
Definition: xlogrecord.h:221
static registered_buffer * registered_buffers
Definition: xloginsert.c:64
int32 pglz_compress(const char *source, int32 slen, char *dest, const PGLZ_Strategy *strategy)
#define SizeOfXLogRecord
Definition: xlogrecord.h:55
#define REGBUF_FORCE_IMAGE
Definition: xloginsert.h:31
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:324
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:416
bool XLogCheckBufferNeedsBackup(Buffer buffer)
Definition: xloginsert.c:860
#define BKPBLOCK_SAME_REL
Definition: xlogrecord.h:183
#define BKPIMAGE_IS_COMPRESSED
Definition: xlogrecord.h:147
#define REGBUF_KEEP_DATA
Definition: xloginsert.h:38
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:3722
#define BKPBLOCK_HAS_IMAGE
Definition: xlogrecord.h:180
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
uint8 RmgrId
Definition: rmgr.h:11
static XLogRecData * mainrdata_last
Definition: xloginsert.c:74
#define XLR_NORMAL_MAX_BLOCK_ID
Definition: xloginsert.h:27
#define PGLZ_MAX_BLCKSZ
Definition: xloginsert.c:37
PageHeaderData * PageHeader
Definition: bufpage.h:166
static int num_rdatas
Definition: xloginsert.c:102
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
RepOriginId replorigin_session_origin
Definition: origin.c:154
bool * wal_consistency_checking
Definition: xlog.c:101
#define XLOG_FPI_FOR_HINT
Definition: pg_control.h:77
uint8 xl_info
Definition: xlogrecord.h:46
#define REGBUF_NO_IMAGE
Definition: xloginsert.h:32
void XLogEnsureRecordSpace(int max_block_id, int ndatas)
Definition: xloginsert.c:147
#define XLR_BLOCK_ID_ORIGIN
Definition: xlogrecord.h:225
ForkNumber forkno
Definition: xloginsert.c:48
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:8364
pg_crc32c xl_crc
Definition: xlogrecord.h:49
struct XLogRecData * next
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
#define InvalidRepOriginId
Definition: origin.h:33
#define PageGetLSN(page)
Definition: bufpage.h:366
#define XLR_BLOCK_ID_DATA_SHORT
Definition: xlogrecord.h:223
static char * hdr_scratch
Definition: xloginsert.c:89
void log_newpage_range(Relation rel, ForkNumber forkNum, BlockNumber startblk, BlockNumber endblk, bool page_std)
Definition: xloginsert.c:1047
#define IsBootstrapProcessingMode()
Definition: miscadmin.h:392
TransactionId xl_xid
Definition: xlogrecord.h:44
static int max_registered_block_id
Definition: xloginsert.c:66
#define XLOG_FPI
Definition: pg_control.h:78
#define PageIsNew(page)
Definition: bufpage.h:229
char compressed_page[PGLZ_MAX_BLCKSZ]
Definition: xloginsert.c:61
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
Definition: xlog.c:8393
XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std)
Definition: xloginsert.c:977
#define elog(elevel,...)
Definition: elog.h:214
int i
XLogRecPtr XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, int num_fpi)
Definition: xlog.c:1001
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
bool wal_compression
Definition: xlog.c:99
const PGLZ_Strategy *const PGLZ_strategy_default
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen)
Definition: xloginsert.c:810
static uint8 curinsert_flags
Definition: xloginsert.c:78
XLogRecData bkp_rdatas[2]
Definition: xloginsert.c:57
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2654
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
void XLogBeginInsert(void)
Definition: xloginsert.c:121
#define PageSetLSN(page, lsn)
Definition: bufpage.h:368
int Buffer
Definition: buf.h:23
#define BufferGetBlock(buffer)
Definition: bufmgr.h:136
#define SizeOfXLogRecordBlockCompressHeader
Definition: xlogrecord.h:160
static MemoryContext xloginsert_cxt
Definition: xloginsert.c:108
Pointer Page
Definition: bufpage.h:78
#define RelFileNodeEquals(node1, node2)
Definition: relfilenode.h:88
static XLogRecData * rdatas
Definition: xloginsert.c:101
#define BKPBLOCK_HAS_DATA
Definition: xlogrecord.h:181
#define SizeOfXLogLongPHD
Definition: xlog_internal.h:72
#define XLR_RMGR_INFO_MASK
Definition: xlogrecord.h:63
#define SizeOfXLogRecordBlockHeader
Definition: xlogrecord.h:104