PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
bulk_write.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * bulk_write.c
4 * Efficiently and reliably populate a new relation
5 *
6 * The assumption is that no other backends access the relation while we are
7 * loading it, so we can take some shortcuts. Pages already present in the
8 * indicated fork when the bulk write operation is started are not modified
9 * unless explicitly written to. Do not mix operations through the regular
10 * buffer manager and the bulk loading interface!
11 *
12 * We bypass the buffer manager to avoid the locking overhead, and call
13 * smgrextend() directly. A downside is that the pages will need to be
14 * re-read into shared buffers on first use after the build finishes. That's
15 * usually a good tradeoff for large relations, and for small relations, the
16 * overhead isn't very significant compared to creating the relation in the
17 * first place.
18 *
19 * The pages are WAL-logged if needed. To save on WAL header overhead, we
20 * WAL-log several pages in one record.
21 *
22 * One tricky point is that because we bypass the buffer manager, we need to
23 * register the relation for fsyncing at the next checkpoint ourselves, and
24 * make sure that the relation is correctly fsync'd by us or the checkpointer
25 * even if a checkpoint happens concurrently.
26 *
27 *
28 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
29 * Portions Copyright (c) 1994, Regents of the University of California
30 *
31 *
32 * IDENTIFICATION
33 * src/backend/storage/smgr/bulk_write.c
34 *
35 *-------------------------------------------------------------------------
36 */
37#include "postgres.h"
38
39#include "access/xloginsert.h"
40#include "access/xlogrecord.h"
41#include "storage/bufpage.h"
42#include "storage/bulk_write.h"
43#include "storage/proc.h"
44#include "storage/smgr.h"
45#include "utils/rel.h"
46
47#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
48
49static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
50
51typedef struct PendingWrite
52{
57
58/*
59 * Bulk writer state for one relation fork.
60 */
62{
63 /* Information about the target relation we're writing */
66 bool use_wal;
67
68 /* We keep several writes queued, and WAL-log them in batches */
71
72 /* Current size of the relation */
74
75 /* The RedoRecPtr at the time that the bulk operation started */
77
79};
80
81static void smgr_bulk_flush(BulkWriteState *bulkstate);
82
83/*
84 * Start a bulk write operation on a relation fork.
85 */
88{
90 forknum,
91 RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
92}
93
94/*
95 * Start a bulk write operation on a relation fork.
96 *
97 * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
98 */
100smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
101{
103
104 state = palloc(sizeof(BulkWriteState));
105 state->smgr = smgr;
106 state->forknum = forknum;
107 state->use_wal = use_wal;
108
109 state->npending = 0;
110 state->relsize = smgrnblocks(smgr, forknum);
111
112 state->start_RedoRecPtr = GetRedoRecPtr();
113
114 /*
115 * Remember the memory context. We will use it to allocate all the
116 * buffers later.
117 */
118 state->memcxt = CurrentMemoryContext;
119
120 return state;
121}
122
123/*
124 * Finish bulk write operation.
125 *
126 * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
127 * the relation if needed.
128 */
129void
131{
132 /* WAL-log and flush any remaining pages */
133 smgr_bulk_flush(bulkstate);
134
135 /*
136 * Fsync the relation, or register it for the next checkpoint, if
137 * necessary.
138 */
139 if (SmgrIsTemp(bulkstate->smgr))
140 {
141 /* Temporary relations don't need to be fsync'd, ever */
142 }
143 else if (!bulkstate->use_wal)
144 {
145 /*----------
146 * This is either an unlogged relation, or a permanent relation but we
147 * skipped WAL-logging because wal_level=minimal:
148 *
149 * A) Unlogged relation
150 *
151 * Unlogged relations will go away on crash, but they need to be
152 * fsync'd on a clean shutdown. It's sufficient to call
153 * smgrregistersync(), that ensures that the checkpointer will
154 * flush it at the shutdown checkpoint. (It will flush it on the
155 * next online checkpoint too, which is not strictly necessary.)
156 *
157 * Note that the init-fork of an unlogged relation is not
158 * considered unlogged for our purposes. It's treated like a
159 * regular permanent relation. The callers will pass use_wal=true
160 * for the init fork.
161 *
162 * B) Permanent relation, WAL-logging skipped because wal_level=minimal
163 *
164 * This is a new relation, and we didn't WAL-log the pages as we
165 * wrote, but they need to be fsync'd before commit.
166 *
167 * We don't need to do that here, however. The fsync() is done at
168 * commit, by smgrDoPendingSyncs() (*).
169 *
170 * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
171 * relation at commit instead of fsyncing it, if the relation was
172 * very small, but it's smgrDoPendingSyncs() responsibility in any
173 * case.
174 *
175 * We cannot distinguish the two here, so conservatively assume it's
176 * an unlogged relation. A permanent relation with wal_level=minimal
177 * would require no actions, see above.
178 */
179 smgrregistersync(bulkstate->smgr, bulkstate->forknum);
180 }
181 else
182 {
183 /*
184 * Permanent relation, WAL-logged normally.
185 *
186 * We already WAL-logged all the pages, so they will be replayed from
187 * WAL on crash. However, when we wrote out the pages, we passed
188 * skipFsync=true to avoid the overhead of registering all the writes
189 * with the checkpointer. Register the whole relation now.
190 *
191 * There is one hole in that idea: If a checkpoint occurred while we
192 * were writing the pages, it already missed fsyncing the pages we had
193 * written before the checkpoint started. A crash later on would
194 * replay the WAL starting from the checkpoint, therefore it wouldn't
195 * replay our earlier WAL records. So if a checkpoint started after
196 * the bulk write, fsync the files now.
197 */
198
199 /*
200 * Prevent a checkpoint from starting between the GetRedoRecPtr() and
201 * smgrregistersync() calls.
202 */
205
206 if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
207 {
208 /*
209 * A checkpoint occurred and it didn't know about our writes, so
210 * fsync() the relation ourselves.
211 */
212 MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
213 smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
214 elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
215 }
216 else
217 {
218 smgrregistersync(bulkstate->smgr, bulkstate->forknum);
219 MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
220 }
221 }
222}
223
224static int
225buffer_cmp(const void *a, const void *b)
226{
227 const PendingWrite *bufa = (const PendingWrite *) a;
228 const PendingWrite *bufb = (const PendingWrite *) b;
229
230 /* We should not see duplicated writes for the same block */
231 Assert(bufa->blkno != bufb->blkno);
232 if (bufa->blkno > bufb->blkno)
233 return 1;
234 else
235 return -1;
236}
237
238/*
239 * Finish all the pending writes.
240 */
241static void
243{
244 int npending = bulkstate->npending;
245 PendingWrite *pending_writes = bulkstate->pending_writes;
246
247 if (npending == 0)
248 return;
249
250 if (npending > 1)
251 qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
252
253 if (bulkstate->use_wal)
254 {
257 bool page_std = true;
258
259 for (int i = 0; i < npending; i++)
260 {
261 blknos[i] = pending_writes[i].blkno;
262 pages[i] = pending_writes[i].buf->data;
263
264 /*
265 * If any of the pages use !page_std, we log them all as such.
266 * That's a bit wasteful, but in practice, a mix of standard and
267 * non-standard page layout is rare. None of the built-in AMs do
268 * that.
269 */
270 if (!pending_writes[i].page_std)
271 page_std = false;
272 }
273 log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
274 npending, blknos, pages, page_std);
275 }
276
277 for (int i = 0; i < npending; i++)
278 {
279 BlockNumber blkno = pending_writes[i].blkno;
280 Page page = pending_writes[i].buf->data;
281
282 PageSetChecksumInplace(page, blkno);
283
284 if (blkno >= bulkstate->relsize)
285 {
286 /*
287 * If we have to write pages nonsequentially, fill in the space
288 * with zeroes until we come back and overwrite. This is not
289 * logically necessary on standard Unix filesystems (unwritten
290 * space will read as zeroes anyway), but it should help to avoid
291 * fragmentation. The dummy pages aren't WAL-logged though.
292 */
293 while (blkno > bulkstate->relsize)
294 {
295 /* don't set checksum for all-zero page */
296 smgrextend(bulkstate->smgr, bulkstate->forknum,
297 bulkstate->relsize,
299 true);
300 bulkstate->relsize++;
301 }
302
303 smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
304 bulkstate->relsize++;
305 }
306 else
307 smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
308 pfree(page);
309 }
310
311 bulkstate->npending = 0;
312}
313
314/*
315 * Queue write of 'buf'.
316 *
317 * NB: this takes ownership of 'buf'!
318 *
319 * You are only allowed to write a given block once as part of one bulk write
320 * operation.
321 */
322void
323smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
324{
325 PendingWrite *w;
326
327 w = &bulkstate->pending_writes[bulkstate->npending++];
328 w->buf = buf;
329 w->blkno = blocknum;
330 w->page_std = page_std;
331
332 if (bulkstate->npending == MAX_PENDING_WRITES)
333 smgr_bulk_flush(bulkstate);
334}
335
336/*
337 * Allocate a new buffer which can later be written with smgr_bulk_write().
338 *
339 * There is no function to free the buffer. When you pass it to
340 * smgr_bulk_write(), it takes ownership and frees it when it's no longer
341 * needed.
342 *
343 * This is currently implemented as a simple palloc, but could be implemented
344 * using a ring buffer or larger chunks in the future, so don't rely on it.
345 */
348{
349 return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
350}
uint32 BlockNumber
Definition: block.h:31
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1531
Pointer Page
Definition: bufpage.h:81
static void smgr_bulk_flush(BulkWriteState *bulkstate)
Definition: bulk_write.c:242
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
Definition: bulk_write.c:87
static const PGIOAlignedBlock zero_buffer
Definition: bulk_write.c:49
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
Definition: bulk_write.c:323
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
Definition: bulk_write.c:347
BulkWriteState * smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
Definition: bulk_write.c:100
#define MAX_PENDING_WRITES
Definition: bulk_write.c:47
void smgr_bulk_finish(BulkWriteState *bulkstate)
Definition: bulk_write.c:130
struct PendingWrite PendingWrite
static int buffer_cmp(const void *a, const void *b)
Definition: bulk_write.c:225
#define Assert(condition)
Definition: c.h:812
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
int b
Definition: isn.c:69
int a
Definition: isn.c:68
int i
Definition: isn.c:72
void pfree(void *pointer)
Definition: mcxt.c:1521
void * MemoryContextAllocAligned(MemoryContext context, Size size, Size alignto, int flags)
Definition: mcxt.c:1409
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define PG_IO_ALIGN_SIZE
static char * buf
Definition: pg_test_fsync.c:72
#define qsort(a, b, c, d)
Definition: port.h:447
#define DELAY_CHKPT_START
Definition: proc.h:119
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
#define RelationNeedsWAL(relation)
Definition: rel.h:628
ForkNumber
Definition: relpath.h:56
@ INIT_FORKNUM
Definition: relpath.h:61
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:677
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:817
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.c:538
void smgrregistersync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:785
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
static void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.h:124
PGPROC * MyProc
Definition: proc.c:66
BlockNumber relsize
Definition: bulk_write.c:73
MemoryContext memcxt
Definition: bulk_write.c:78
SMgrRelation smgr
Definition: bulk_write.c:64
XLogRecPtr start_RedoRecPtr
Definition: bulk_write.c:76
ForkNumber forknum
Definition: bulk_write.c:65
PendingWrite pending_writes[MAX_PENDING_WRITES]
Definition: bulk_write.c:70
int delayChkptFlags
Definition: proc.h:240
BulkWriteBuffer buf
Definition: bulk_write.c:53
BlockNumber blkno
Definition: bulk_write.c:54
bool page_std
Definition: bulk_write.c:55
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
Definition: regguts.h:323
char data[BLCKSZ]
Definition: c.h:1091
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6437
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages, BlockNumber *blknos, Page *pages, bool page_std)
Definition: xloginsert.c:1175