PostgreSQL Source Code  git master
bulk_write.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bulk_write.c
4  * Efficiently and reliably populate a new relation
5  *
6  * The assumption is that no other backends access the relation while we are
7  * loading it, so we can take some shortcuts. Do not mix operations through
8  * the regular buffer manager and the bulk loading interface!
9  *
10  * We bypass the buffer manager to avoid the locking overhead, and call
11  * smgrextend() directly. A downside is that the pages will need to be
12  * re-read into shared buffers on first use after the build finishes. That's
13  * usually a good tradeoff for large relations, and for small relations, the
14  * overhead isn't very significant compared to creating the relation in the
15  * first place.
16  *
17  * The pages are WAL-logged if needed. To save on WAL header overhead, we
18  * WAL-log several pages in one record.
19  *
20  * One tricky point is that because we bypass the buffer manager, we need to
21  * register the relation for fsyncing at the next checkpoint ourselves, and
22  * make sure that the relation is correctly fsync'd by us or the checkpointer
23  * even if a checkpoint happens concurrently.
24  *
25  *
26  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
27  * Portions Copyright (c) 1994, Regents of the University of California
28  *
29  *
30  * IDENTIFICATION
31  * src/backend/storage/smgr/bulk_write.c
32  *
33  *-------------------------------------------------------------------------
34  */
35 #include "postgres.h"
36 
37 #include "access/xloginsert.h"
38 #include "access/xlogrecord.h"
39 #include "storage/bufmgr.h"
40 #include "storage/bufpage.h"
41 #include "storage/bulk_write.h"
42 #include "storage/proc.h"
43 #include "storage/smgr.h"
44 #include "utils/rel.h"
45 
46 #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
47 
48 static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
49 
50 typedef struct PendingWrite
51 {
54  bool page_std;
56 
57 /*
58  * Bulk writer state for one relation fork.
59  */
61 {
62  /* Information about the target relation we're writing */
65  bool use_wal;
66 
67  /* We keep several writes queued, and WAL-log them in batches */
68  int npending;
70 
71  /* Current size of the relation */
73 
74  /* The RedoRecPtr at the time that the bulk operation started */
76 
78 };
79 
80 static void smgr_bulk_flush(BulkWriteState *bulkstate);
81 
82 /*
83  * Start a bulk write operation on a relation fork.
84  */
87 {
89  forknum,
90  RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
91 }
92 
93 /*
94  * Start a bulk write operation on a relation fork.
95  *
96  * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
97  */
99 smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
100 {
102 
103  state = palloc(sizeof(BulkWriteState));
104  state->smgr = smgr;
105  state->forknum = forknum;
106  state->use_wal = use_wal;
107 
108  state->npending = 0;
109  state->pages_written = 0;
110 
111  state->start_RedoRecPtr = GetRedoRecPtr();
112 
113  /*
114  * Remember the memory context. We will use it to allocate all the
115  * buffers later.
116  */
117  state->memcxt = CurrentMemoryContext;
118 
119  return state;
120 }
121 
122 /*
123  * Finish bulk write operation.
124  *
125  * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
126  * the relation if needed.
127  */
128 void
130 {
131  /* WAL-log and flush any remaining pages */
132  smgr_bulk_flush(bulkstate);
133 
134  /*
135  * Fsync the relation, or register it for the next checkpoint, if
136  * necessary.
137  */
138  if (SmgrIsTemp(bulkstate->smgr))
139  {
140  /* Temporary relations don't need to be fsync'd, ever */
141  }
142  else if (!bulkstate->use_wal)
143  {
144  /*----------
145  * This is either an unlogged relation, or a permanent relation but we
146  * skipped WAL-logging because wal_level=minimal:
147  *
148  * A) Unlogged relation
149  *
150  * Unlogged relations will go away on crash, but they need to be
151  * fsync'd on a clean shutdown. It's sufficient to call
152  * smgrregistersync(), that ensures that the checkpointer will
153  * flush it at the shutdown checkpoint. (It will flush it on the
154  * next online checkpoint too, which is not strictly necessary.)
155  *
156  * Note that the init-fork of an unlogged relation is not
157  * considered unlogged for our purposes. It's treated like a
158  * regular permanent relation. The callers will pass use_wal=true
159  * for the init fork.
160  *
161  * B) Permanent relation, WAL-logging skipped because wal_level=minimal
162  *
163  * This is a new relation, and we didn't WAL-log the pages as we
164  * wrote, but they need to be fsync'd before commit.
165  *
166  * We don't need to do that here, however. The fsync() is done at
167  * commit, by smgrDoPendingSyncs() (*).
168  *
169  * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
170  * relation at commit instead of fsyncing it, if the relation was
171  * very small, but it's smgrDoPendingSyncs() responsibility in any
172  * case.
173  *
174  * We cannot distinguish the two here, so conservatively assume it's
175  * an unlogged relation. A permanent relation with wal_level=minimal
176  * would require no actions, see above.
177  */
178  smgrregistersync(bulkstate->smgr, bulkstate->forknum);
179  }
180  else
181  {
182  /*
183  * Permanent relation, WAL-logged normally.
184  *
185  * We already WAL-logged all the pages, so they will be replayed from
186  * WAL on crash. However, when we wrote out the pages, we passed
187  * skipFsync=true to avoid the overhead of registering all the writes
188  * with the checkpointer. Register the whole relation now.
189  *
190  * There is one hole in that idea: If a checkpoint occurred while we
191  * were writing the pages, it already missed fsyncing the pages we had
192  * written before the checkpoint started. A crash later on would
193  * replay the WAL starting from the checkpoint, therefore it wouldn't
194  * replay our earlier WAL records. So if a checkpoint started after
195  * the bulk write, fsync the files now.
196  */
197 
198  /*
199  * Prevent a checkpoint from starting between the GetRedoRecPtr() and
200  * smgrregistersync() calls.
201  */
204 
205  if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
206  {
207  /*
208  * A checkpoint occurred and it didn't know about our writes, so
209  * fsync() the relation ourselves.
210  */
212  smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
213  elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
214  }
215  else
216  {
217  smgrregistersync(bulkstate->smgr, bulkstate->forknum);
219  }
220  }
221 }
222 
223 static int
224 buffer_cmp(const void *a, const void *b)
225 {
226  const PendingWrite *bufa = (const PendingWrite *) a;
227  const PendingWrite *bufb = (const PendingWrite *) b;
228 
229  /* We should not see duplicated writes for the same block */
230  Assert(bufa->blkno != bufb->blkno);
231  if (bufa->blkno > bufb->blkno)
232  return 1;
233  else
234  return -1;
235 }
236 
237 /*
238  * Finish all the pending writes.
239  */
240 static void
242 {
243  int npending = bulkstate->npending;
244  PendingWrite *pending_writes = bulkstate->pending_writes;
245 
246  if (npending == 0)
247  return;
248 
249  if (npending > 1)
250  qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
251 
252  if (bulkstate->use_wal)
253  {
255  Page pages[MAX_PENDING_WRITES];
256  bool page_std = true;
257 
258  for (int i = 0; i < npending; i++)
259  {
260  blknos[i] = pending_writes[i].blkno;
261  pages[i] = pending_writes[i].buf->data;
262 
263  /*
264  * If any of the pages use !page_std, we log them all as such.
265  * That's a bit wasteful, but in practice, a mix of standard and
266  * non-standard page layout is rare. None of the built-in AMs do
267  * that.
268  */
269  if (!pending_writes[i].page_std)
270  page_std = false;
271  }
272  log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
273  npending, blknos, pages, page_std);
274  }
275 
276  for (int i = 0; i < npending; i++)
277  {
278  BlockNumber blkno = pending_writes[i].blkno;
279  Page page = pending_writes[i].buf->data;
280 
281  PageSetChecksumInplace(page, blkno);
282 
283  if (blkno >= bulkstate->pages_written)
284  {
285  /*
286  * If we have to write pages nonsequentially, fill in the space
287  * with zeroes until we come back and overwrite. This is not
288  * logically necessary on standard Unix filesystems (unwritten
289  * space will read as zeroes anyway), but it should help to avoid
290  * fragmentation. The dummy pages aren't WAL-logged though.
291  */
292  while (blkno > bulkstate->pages_written)
293  {
294  /* don't set checksum for all-zero page */
295  smgrextend(bulkstate->smgr, bulkstate->forknum,
296  bulkstate->pages_written++,
297  &zero_buffer,
298  true);
299  }
300 
301  smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
302  bulkstate->pages_written = pending_writes[i].blkno + 1;
303  }
304  else
305  smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
306  pfree(page);
307  }
308 
309  bulkstate->npending = 0;
310 }
311 
312 /*
313  * Queue write of 'buf'.
314  *
315  * NB: this takes ownership of 'buf'!
316  *
317  * You are only allowed to write a given block once as part of one bulk write
318  * operation.
319  */
320 void
321 smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
322 {
323  PendingWrite *w;
324 
325  w = &bulkstate->pending_writes[bulkstate->npending++];
326  w->buf = buf;
327  w->blkno = blocknum;
328  w->page_std = page_std;
329 
330  if (bulkstate->npending == MAX_PENDING_WRITES)
331  smgr_bulk_flush(bulkstate);
332 }
333 
334 /*
335  * Allocate a new buffer which can later be written with smgr_bulk_write().
336  *
337  * There is no function to free the buffer. When you pass it to
338  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
339  * needed.
340  *
341  * This is currently implemented as a simple palloc, but could be implemented
342  * using a ring buffer or larger chunks in the future, so don't rely on it.
343  */
346 {
347  return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
348 }
uint32 BlockNumber
Definition: block.h:31
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1542
Pointer Page
Definition: bufpage.h:81
static void smgr_bulk_flush(BulkWriteState *bulkstate)
Definition: bulk_write.c:241
static const PGIOAlignedBlock zero_buffer
Definition: bulk_write.c:48
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition: bulk_write.c:321
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition: bulk_write.c:345
#define MAX_PENDING_WRITES
Definition: bulk_write.c:46
BulkWriteState * smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
Definition: bulk_write.c:99
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition: bulk_write.c:129
struct PendingWrite PendingWrite
static int buffer_cmp(const void *a, const void *b)
Definition: bulk_write.c:224
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition: bulk_write.c:86
#define Assert(condition)
Definition: c.h:861
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int i
Definition: isn.c:73
void * MemoryContextAllocAligned(MemoryContext context, Size size, Size alignto, int flags)
Definition: mcxt.c:1409
void pfree(void *pointer)
Definition: mcxt.c:1521
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1317
#define PG_IO_ALIGN_SIZE
static char * buf
Definition: pg_test_fsync.c:73
#define qsort(a, b, c, d)
Definition: port.h:447
#define DELAY_CHKPT_START
Definition: proc.h:119
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
#define RelationNeedsWAL(relation)
Definition: rel.h:628
ForkNumber
Definition: relpath.h:56
@ INIT_FORKNUM
Definition: relpath.h:61
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:789
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.c:535
void smgrregistersync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:757
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
static void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.h:121
PGPROC * MyProc
Definition: proc.c:67
BlockNumber pages_written
Definition: bulk_write.c:72
MemoryContext memcxt
Definition: bulk_write.c:77
SMgrRelation smgr
Definition: bulk_write.c:63
XLogRecPtr start_RedoRecPtr
Definition: bulk_write.c:75
ForkNumber forknum
Definition: bulk_write.c:64
PendingWrite pending_writes[MAX_PENDING_WRITES]
Definition: bulk_write.c:69
int delayChkptFlags
Definition: proc.h:240
BulkWriteBuffer buf
Definition: bulk_write.c:52
BlockNumber blkno
Definition: bulk_write.c:53
bool page_std
Definition: bulk_write.c:54
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
Definition: regguts.h:323
char data[BLCKSZ]
Definition: c.h:1140
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6436
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages, BlockNumber *blknos, Page *pages, bool page_std)
Definition: xloginsert.c:1175