PostgreSQL Source Code  git master
bulk_write.c File Reference
#include "postgres.h"
#include "access/xloginsert.h"
#include "access/xlogrecord.h"
#include "storage/bufpage.h"
#include "storage/bulk_write.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/rel.h"
Include dependency graph for bulk_write.c:

Go to the source code of this file.

Data Structures

struct  PendingWrite
 
struct  BulkWriteState
 

Macros

#define MAX_PENDING_WRITES   XLR_MAX_BLOCK_ID
 

Typedefs

typedef struct PendingWrite PendingWrite
 

Functions

static void smgr_bulk_flush (BulkWriteState *bulkstate)
 
BulkWriteStatesmgr_bulk_start_rel (Relation rel, ForkNumber forknum)
 
BulkWriteStatesmgr_bulk_start_smgr (SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
void smgr_bulk_finish (BulkWriteState *bulkstate)
 
static int buffer_cmp (const void *a, const void *b)
 
void smgr_bulk_write (BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
 
BulkWriteBuffer smgr_bulk_get_buf (BulkWriteState *bulkstate)
 

Variables

static const PGIOAlignedBlock zero_buffer = {{0}}
 

Macro Definition Documentation

◆ MAX_PENDING_WRITES

#define MAX_PENDING_WRITES   XLR_MAX_BLOCK_ID

Definition at line 45 of file bulk_write.c.

Typedef Documentation

◆ PendingWrite

typedef struct PendingWrite PendingWrite

Function Documentation

◆ buffer_cmp()

static int buffer_cmp ( const void *  a,
const void *  b 
)
static

Definition at line 223 of file bulk_write.c.

224 {
225  const PendingWrite *bufa = (const PendingWrite *) a;
226  const PendingWrite *bufb = (const PendingWrite *) b;
227 
228  /* We should not see duplicated writes for the same block */
229  Assert(bufa->blkno != bufb->blkno);
230  if (bufa->blkno > bufb->blkno)
231  return 1;
232  else
233  return -1;
234 }
#define Assert(condition)
Definition: c.h:861
int b
Definition: isn.c:69
int a
Definition: isn.c:68
BlockNumber blkno
Definition: bulk_write.c:52

References a, Assert, b, and PendingWrite::blkno.

Referenced by smgr_bulk_flush().

◆ smgr_bulk_finish()

void smgr_bulk_finish ( BulkWriteState bulkstate)

Definition at line 128 of file bulk_write.c.

129 {
130  /* WAL-log and flush any remaining pages */
131  smgr_bulk_flush(bulkstate);
132 
133  /*
134  * Fsync the relation, or register it for the next checkpoint, if
135  * necessary.
136  */
137  if (SmgrIsTemp(bulkstate->smgr))
138  {
139  /* Temporary relations don't need to be fsync'd, ever */
140  }
141  else if (!bulkstate->use_wal)
142  {
143  /*----------
144  * This is either an unlogged relation, or a permanent relation but we
145  * skipped WAL-logging because wal_level=minimal:
146  *
147  * A) Unlogged relation
148  *
149  * Unlogged relations will go away on crash, but they need to be
150  * fsync'd on a clean shutdown. It's sufficient to call
151  * smgrregistersync(), that ensures that the checkpointer will
152  * flush it at the shutdown checkpoint. (It will flush it on the
153  * next online checkpoint too, which is not strictly necessary.)
154  *
155  * Note that the init-fork of an unlogged relation is not
156  * considered unlogged for our purposes. It's treated like a
157  * regular permanent relation. The callers will pass use_wal=true
158  * for the init fork.
159  *
160  * B) Permanent relation, WAL-logging skipped because wal_level=minimal
161  *
162  * This is a new relation, and we didn't WAL-log the pages as we
163  * wrote, but they need to be fsync'd before commit.
164  *
165  * We don't need to do that here, however. The fsync() is done at
166  * commit, by smgrDoPendingSyncs() (*).
167  *
168  * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
169  * relation at commit instead of fsyncing it, if the relation was
170  * very small, but it's smgrDoPendingSyncs() responsibility in any
171  * case.
172  *
173  * We cannot distinguish the two here, so conservatively assume it's
174  * an unlogged relation. A permanent relation with wal_level=minimal
175  * would require no actions, see above.
176  */
177  smgrregistersync(bulkstate->smgr, bulkstate->forknum);
178  }
179  else
180  {
181  /*
182  * Permanent relation, WAL-logged normally.
183  *
184  * We already WAL-logged all the pages, so they will be replayed from
185  * WAL on crash. However, when we wrote out the pages, we passed
186  * skipFsync=true to avoid the overhead of registering all the writes
187  * with the checkpointer. Register the whole relation now.
188  *
189  * There is one hole in that idea: If a checkpoint occurred while we
190  * were writing the pages, it already missed fsyncing the pages we had
191  * written before the checkpoint started. A crash later on would
192  * replay the WAL starting from the checkpoint, therefore it wouldn't
193  * replay our earlier WAL records. So if a checkpoint started after
194  * the bulk write, fsync the files now.
195  */
196 
197  /*
198  * Prevent a checkpoint from starting between the GetRedoRecPtr() and
199  * smgrregistersync() calls.
200  */
203 
204  if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
205  {
206  /*
207  * A checkpoint occurred and it didn't know about our writes, so
208  * fsync() the relation ourselves.
209  */
211  smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
212  elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
213  }
214  else
215  {
216  smgrregistersync(bulkstate->smgr, bulkstate->forknum);
218  }
219  }
220 }
static void smgr_bulk_flush(BulkWriteState *bulkstate)
Definition: bulk_write.c:240
#define DEBUG1
Definition: elog.h:30
#define elog(elevel,...)
Definition: elog.h:225
#define DELAY_CHKPT_START
Definition: proc.h:119
void smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:811
void smgrregistersync(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:779
#define SmgrIsTemp(smgr)
Definition: smgr.h:73
PGPROC * MyProc
Definition: proc.c:66
SMgrRelation smgr
Definition: bulk_write.c:62
XLogRecPtr start_RedoRecPtr
Definition: bulk_write.c:74
ForkNumber forknum
Definition: bulk_write.c:63
int delayChkptFlags
Definition: proc.h:240
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6437

References Assert, DEBUG1, DELAY_CHKPT_START, PGPROC::delayChkptFlags, elog, BulkWriteState::forknum, GetRedoRecPtr(), MyProc, BulkWriteState::smgr, smgr_bulk_flush(), smgrimmedsync(), SmgrIsTemp, smgrregistersync(), BulkWriteState::start_RedoRecPtr, and BulkWriteState::use_wal.

Referenced by _bt_load(), btbuildempty(), end_heap_rewrite(), gist_indexsortbuild(), RelationCopyStorage(), and spgbuildempty().

◆ smgr_bulk_flush()

static void smgr_bulk_flush ( BulkWriteState bulkstate)
static

Definition at line 240 of file bulk_write.c.

241 {
242  int npending = bulkstate->npending;
243  PendingWrite *pending_writes = bulkstate->pending_writes;
244 
245  if (npending == 0)
246  return;
247 
248  if (npending > 1)
249  qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
250 
251  if (bulkstate->use_wal)
252  {
254  Page pages[MAX_PENDING_WRITES];
255  bool page_std = true;
256 
257  for (int i = 0; i < npending; i++)
258  {
259  blknos[i] = pending_writes[i].blkno;
260  pages[i] = pending_writes[i].buf->data;
261 
262  /*
263  * If any of the pages use !page_std, we log them all as such.
264  * That's a bit wasteful, but in practice, a mix of standard and
265  * non-standard page layout is rare. None of the built-in AMs do
266  * that.
267  */
268  if (!pending_writes[i].page_std)
269  page_std = false;
270  }
271  log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
272  npending, blknos, pages, page_std);
273  }
274 
275  for (int i = 0; i < npending; i++)
276  {
277  BlockNumber blkno = pending_writes[i].blkno;
278  Page page = pending_writes[i].buf->data;
279 
280  PageSetChecksumInplace(page, blkno);
281 
282  if (blkno >= bulkstate->pages_written)
283  {
284  /*
285  * If we have to write pages nonsequentially, fill in the space
286  * with zeroes until we come back and overwrite. This is not
287  * logically necessary on standard Unix filesystems (unwritten
288  * space will read as zeroes anyway), but it should help to avoid
289  * fragmentation. The dummy pages aren't WAL-logged though.
290  */
291  while (blkno > bulkstate->pages_written)
292  {
293  /* don't set checksum for all-zero page */
294  smgrextend(bulkstate->smgr, bulkstate->forknum,
295  bulkstate->pages_written++,
296  &zero_buffer,
297  true);
298  }
299 
300  smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
301  bulkstate->pages_written = pending_writes[i].blkno + 1;
302  }
303  else
304  smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
305  pfree(page);
306  }
307 
308  bulkstate->npending = 0;
309 }
uint32 BlockNumber
Definition: block.h:31
void PageSetChecksumInplace(Page page, BlockNumber blkno)
Definition: bufpage.c:1542
Pointer Page
Definition: bufpage.h:81
static const PGIOAlignedBlock zero_buffer
Definition: bulk_write.c:47
#define MAX_PENDING_WRITES
Definition: bulk_write.c:45
static int buffer_cmp(const void *a, const void *b)
Definition: bulk_write.c:223
int i
Definition: isn.c:72
void pfree(void *pointer)
Definition: mcxt.c:1521
#define qsort(a, b, c, d)
Definition: port.h:447
void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.c:538
static void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const void *buffer, bool skipFsync)
Definition: smgr.h:123
BlockNumber pages_written
Definition: bulk_write.c:71
PendingWrite pending_writes[MAX_PENDING_WRITES]
Definition: bulk_write.c:68
BulkWriteBuffer buf
Definition: bulk_write.c:51
RelFileLocator locator
RelFileLocatorBackend smgr_rlocator
Definition: smgr.h:37
char data[BLCKSZ]
Definition: c.h:1140
void log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages, BlockNumber *blknos, Page *pages, bool page_std)
Definition: xloginsert.c:1175

References PendingWrite::blkno, PendingWrite::buf, buffer_cmp(), PGIOAlignedBlock::data, BulkWriteState::forknum, i, RelFileLocatorBackend::locator, log_newpages(), MAX_PENDING_WRITES, BulkWriteState::npending, BulkWriteState::pages_written, PageSetChecksumInplace(), BulkWriteState::pending_writes, pfree(), qsort, BulkWriteState::smgr, SMgrRelationData::smgr_rlocator, smgrextend(), smgrwrite(), BulkWriteState::use_wal, and zero_buffer.

Referenced by smgr_bulk_finish(), and smgr_bulk_write().

◆ smgr_bulk_get_buf()

BulkWriteBuffer smgr_bulk_get_buf ( BulkWriteState bulkstate)

Definition at line 344 of file bulk_write.c.

345 {
346  return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
347 }
void * MemoryContextAllocAligned(MemoryContext context, Size size, Size alignto, int flags)
Definition: mcxt.c:1409
#define PG_IO_ALIGN_SIZE
MemoryContext memcxt
Definition: bulk_write.c:76

References BulkWriteState::memcxt, MemoryContextAllocAligned(), and PG_IO_ALIGN_SIZE.

Referenced by _bt_blnewpage(), _bt_uppershutdown(), btbuildempty(), gist_indexsortbuild(), gist_indexsortbuild_levelstate_flush(), raw_heap_insert(), RelationCopyStorage(), and spgbuildempty().

◆ smgr_bulk_start_rel()

BulkWriteState* smgr_bulk_start_rel ( Relation  rel,
ForkNumber  forknum 
)

Definition at line 85 of file bulk_write.c.

86 {
88  forknum,
89  RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
90 }
BulkWriteState * smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
Definition: bulk_write.c:98
static SMgrRelation RelationGetSmgr(Relation rel)
Definition: rel.h:567
#define RelationNeedsWAL(relation)
Definition: rel.h:628
@ INIT_FORKNUM
Definition: relpath.h:61

References INIT_FORKNUM, RelationGetSmgr(), RelationNeedsWAL, and smgr_bulk_start_smgr().

Referenced by _bt_load(), begin_heap_rewrite(), btbuildempty(), gist_indexsortbuild(), and spgbuildempty().

◆ smgr_bulk_start_smgr()

BulkWriteState* smgr_bulk_start_smgr ( SMgrRelation  smgr,
ForkNumber  forknum,
bool  use_wal 
)

Definition at line 98 of file bulk_write.c.

99 {
101 
102  state = palloc(sizeof(BulkWriteState));
103  state->smgr = smgr;
104  state->forknum = forknum;
105  state->use_wal = use_wal;
106 
107  state->npending = 0;
108  state->pages_written = 0;
109 
110  state->start_RedoRecPtr = GetRedoRecPtr();
111 
112  /*
113  * Remember the memory context. We will use it to allocate all the
114  * buffers later.
115  */
116  state->memcxt = CurrentMemoryContext;
117 
118  return state;
119 }
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1317
Definition: regguts.h:323

References CurrentMemoryContext, GetRedoRecPtr(), and palloc().

Referenced by RelationCopyStorage(), and smgr_bulk_start_rel().

◆ smgr_bulk_write()

void smgr_bulk_write ( BulkWriteState bulkstate,
BlockNumber  blocknum,
BulkWriteBuffer  buf,
bool  page_std 
)

Definition at line 320 of file bulk_write.c.

321 {
322  PendingWrite *w;
323 
324  w = &bulkstate->pending_writes[bulkstate->npending++];
325  w->buf = buf;
326  w->blkno = blocknum;
327  w->page_std = page_std;
328 
329  if (bulkstate->npending == MAX_PENDING_WRITES)
330  smgr_bulk_flush(bulkstate);
331 }
static char * buf
Definition: pg_test_fsync.c:72
bool page_std
Definition: bulk_write.c:53

References PendingWrite::blkno, PendingWrite::buf, buf, MAX_PENDING_WRITES, BulkWriteState::npending, PendingWrite::page_std, BulkWriteState::pending_writes, and smgr_bulk_flush().

Referenced by _bt_blwritepage(), btbuildempty(), end_heap_rewrite(), gist_indexsortbuild(), gist_indexsortbuild_levelstate_flush(), raw_heap_insert(), RelationCopyStorage(), and spgbuildempty().

Variable Documentation

◆ zero_buffer

const PGIOAlignedBlock zero_buffer = {{0}}
static

Definition at line 47 of file bulk_write.c.

Referenced by smgr_bulk_flush().