PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56 
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
68 
69 typedef struct
70 {
72  int hasSeek;
73  /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74  pgoff_t lastFilePos; /* position after last data block we've read */
75 } lclContext;
76 
77 typedef struct
78 {
79  int dataState;
80  pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  * Init routine required by ALL formats. This is a global routine
97  * and should be declared in pg_backup_archiver.h
98  *
99  * It's task is to create any extra archive context (using AH->formatData),
100  * and to initialize the supported function pointers.
101  *
102  * It should also prepare whatever it's input source is for reading/writing,
103  * and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
107 {
108  lclContext *ctx;
109 
110  /* Assuming static functions, this can be copied for each format. */
112  AH->StartDataPtr = _StartData;
113  AH->WriteDataPtr = _WriteData;
114  AH->EndDataPtr = _EndData;
115  AH->WriteBytePtr = _WriteByte;
116  AH->ReadBytePtr = _ReadByte;
117  AH->WriteBufPtr = _WriteBuf;
118  AH->ReadBufPtr = _ReadBuf;
119  AH->ClosePtr = _CloseArchive;
125 
127  AH->StartBlobPtr = _StartBlob;
128  AH->EndBlobPtr = _EndBlob;
129  AH->EndBlobsPtr = _EndBlobs;
130 
132  AH->ClonePtr = _Clone;
133  AH->DeClonePtr = _DeClone;
134 
135  /* no parallel dump in the custom archive, only parallel restore */
136  AH->WorkerJobDumpPtr = NULL;
138 
139  /* Set up a private area. */
140  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141  AH->formatData = (void *) ctx;
142 
143  /* Initialize LO buffering */
144  AH->lo_buf_size = LOBBUFSIZE;
145  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147  /*
148  * Now open the file
149  */
150  if (AH->mode == archModeWrite)
151  {
152  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153  {
154  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
155  if (!AH->FH)
156  fatal("could not open output file \"%s\": %m", AH->fSpec);
157  }
158  else
159  {
160  AH->FH = stdout;
161  if (!AH->FH)
162  fatal("could not open output file: %m");
163  }
164 
165  ctx->hasSeek = checkSeek(AH->FH);
166  }
167  else
168  {
169  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170  {
171  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
172  if (!AH->FH)
173  fatal("could not open input file \"%s\": %m", AH->fSpec);
174  }
175  else
176  {
177  AH->FH = stdin;
178  if (!AH->FH)
179  fatal("could not open input file: %m");
180  }
181 
182  ctx->hasSeek = checkSeek(AH->FH);
183 
184  ReadHead(AH);
185  ReadToc(AH);
186 
187  /*
188  * Remember location of first data block (i.e., the point after TOC)
189  * in case we have to search for desired data blocks.
190  */
191  ctx->lastFilePos = _getFilePos(AH, ctx);
192  }
193 }
194 
195 /*
196  * Called by the Archiver when the dumper creates a new TOC entry.
197  *
198  * Optional.
199  *
200  * Set up extract format-related TOC data.
201 */
202 static void
204 {
205  lclTocEntry *ctx;
206 
207  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
208  if (te->dataDumper)
210  else
212 
213  te->formatData = (void *) ctx;
214 }
215 
216 /*
217  * Called by the Archiver to save any extra format-related TOC entry
218  * data.
219  *
220  * Optional.
221  *
222  * Use the Archiver routines to write data - they are non-endian, and
223  * maintain other important file information.
224  */
225 static void
227 {
228  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229 
230  WriteOffset(AH, ctx->dataPos, ctx->dataState);
231 }
232 
233 /*
234  * Called by the Archiver to read any extra format-related TOC data.
235  *
236  * Optional.
237  *
238  * Needs to match the order defined in _WriteExtraToc, and should also
239  * use the Archiver input routines.
240  */
241 static void
243 {
244  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245 
246  if (ctx == NULL)
247  {
248  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
249  te->formatData = (void *) ctx;
250  }
251 
252  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253 
254  /*
255  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256  * dump it at all.
257  */
258  if (AH->version < K_VERS_1_7)
259  ReadInt(AH);
260 }
261 
262 /*
263  * Called by the Archiver when restoring an archive to output a comment
264  * that includes useful information about the TOC entry.
265  *
266  * Optional.
267  */
268 static void
270 {
271  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272 
273  if (AH->public.verbose)
274  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275  (int64) ctx->dataPos);
276 }
277 
278 /*
279  * Called by the archiver when saving TABLE DATA (not schema). This routine
280  * should save whatever format-specific information is needed to read
281  * the archive back.
282  *
283  * It is called just prior to the dumper's 'DataDumper' routine being called.
284  *
285  * Optional, but strongly recommended.
286  *
287  */
288 static void
290 {
291  lclContext *ctx = (lclContext *) AH->formatData;
292  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 
294  tctx->dataPos = _getFilePos(AH, ctx);
295  if (tctx->dataPos >= 0)
296  tctx->dataState = K_OFFSET_POS_SET;
297 
298  _WriteByte(AH, BLK_DATA); /* Block type */
299  WriteInt(AH, te->dumpId); /* For sanity check */
300 
302 }
303 
304 /*
305  * Called by archiver when dumper calls WriteData. This routine is
306  * called for both BLOB and TABLE data; it is the responsibility of
307  * the format to manage each kind of data using StartBlob/StartData.
308  *
309  * It should only be called from within a DataDumper routine.
310  *
311  * Mandatory.
312  */
313 static void
314 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
315 {
316  lclContext *ctx = (lclContext *) AH->formatData;
317  CompressorState *cs = ctx->cs;
318 
319  if (dLen > 0)
320  /* WriteDataToArchive() internally throws write errors */
321  WriteDataToArchive(AH, cs, data, dLen);
322 }
323 
324 /*
325  * Called by the archiver when a dumper's 'DataDumper' routine has
326  * finished.
327  *
328  * Optional.
329  */
330 static void
332 {
333  lclContext *ctx = (lclContext *) AH->formatData;
334 
335  EndCompressor(AH, ctx->cs);
336  /* Send the end marker */
337  WriteInt(AH, 0);
338 }
339 
340 /*
341  * Called by the archiver when starting to save all BLOB DATA (not schema).
342  * This routine should save whatever format-specific information is needed
343  * to read the BLOBs back into memory.
344  *
345  * It is called just prior to the dumper's DataDumper routine.
346  *
347  * Optional, but strongly recommended.
348  */
349 static void
351 {
352  lclContext *ctx = (lclContext *) AH->formatData;
353  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
354 
355  tctx->dataPos = _getFilePos(AH, ctx);
356  if (tctx->dataPos >= 0)
357  tctx->dataState = K_OFFSET_POS_SET;
358 
359  _WriteByte(AH, BLK_BLOBS); /* Block type */
360  WriteInt(AH, te->dumpId); /* For sanity check */
361 }
362 
363 /*
364  * Called by the archiver when the dumper calls StartBlob.
365  *
366  * Mandatory.
367  *
368  * Must save the passed OID for retrieval at restore-time.
369  */
370 static void
372 {
373  lclContext *ctx = (lclContext *) AH->formatData;
374 
375  if (oid == 0)
376  fatal("invalid OID for large object");
377 
378  WriteInt(AH, oid);
379 
381 }
382 
383 /*
384  * Called by the archiver when the dumper calls EndBlob.
385  *
386  * Optional.
387  */
388 static void
390 {
391  lclContext *ctx = (lclContext *) AH->formatData;
392 
393  EndCompressor(AH, ctx->cs);
394  /* Send the end marker */
395  WriteInt(AH, 0);
396 }
397 
398 /*
399  * Called by the archiver when finishing saving all BLOB DATA.
400  *
401  * Optional.
402  */
403 static void
405 {
406  /* Write out a fake zero OID to mark end-of-blobs. */
407  WriteInt(AH, 0);
408 }
409 
410 /*
411  * Print data for a given TOC entry
412  */
413 static void
415 {
416  lclContext *ctx = (lclContext *) AH->formatData;
417  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
418  int blkType;
419  int id;
420 
421  if (tctx->dataState == K_OFFSET_NO_DATA)
422  return;
423 
424  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
425  {
426  /*
427  * We cannot seek directly to the desired block. Instead, skip over
428  * block headers until we find the one we want. Remember the
429  * positions of skipped-over blocks, so that if we later decide we
430  * need to read one, we'll be able to seek to it.
431  *
432  * When our input file is seekable, we can do the search starting from
433  * the point after the last data block we scanned in previous
434  * iterations of this function.
435  */
436  if (ctx->hasSeek)
437  {
438  if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
439  fatal("error during file seek: %m");
440  }
441 
442  for (;;)
443  {
444  pgoff_t thisBlkPos = _getFilePos(AH, ctx);
445 
446  _readBlockHeader(AH, &blkType, &id);
447 
448  if (blkType == EOF || id == te->dumpId)
449  break;
450 
451  /* Remember the block position, if we got one */
452  if (thisBlkPos >= 0)
453  {
454  TocEntry *otherte = getTocEntryByDumpId(AH, id);
455 
456  if (otherte && otherte->formatData)
457  {
458  lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
459 
460  /*
461  * Note: on Windows, multiple threads might access/update
462  * the same lclTocEntry concurrently, but that should be
463  * safe as long as we update dataPos before dataState.
464  * Ideally, we'd use pg_write_barrier() to enforce that,
465  * but the needed infrastructure doesn't exist in frontend
466  * code. But Windows only runs on machines with strong
467  * store ordering, so it should be okay for now.
468  */
469  if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
470  {
471  othertctx->dataPos = thisBlkPos;
472  othertctx->dataState = K_OFFSET_POS_SET;
473  }
474  else if (othertctx->dataPos != thisBlkPos ||
475  othertctx->dataState != K_OFFSET_POS_SET)
476  {
477  /* sanity check */
478  pg_log_warning("data block %d has wrong seek position",
479  id);
480  }
481  }
482  }
483 
484  switch (blkType)
485  {
486  case BLK_DATA:
487  _skipData(AH);
488  break;
489 
490  case BLK_BLOBS:
491  _skipBlobs(AH);
492  break;
493 
494  default: /* Always have a default */
495  fatal("unrecognized data block type (%d) while searching archive",
496  blkType);
497  break;
498  }
499  }
500  }
501  else
502  {
503  /* We can just seek to the place we need to be. */
504  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
505  fatal("error during file seek: %m");
506 
507  _readBlockHeader(AH, &blkType, &id);
508  }
509 
510  /*
511  * If we reached EOF without finding the block we want, then either it
512  * doesn't exist, or it does but we lack the ability to seek back to it.
513  */
514  if (blkType == EOF)
515  {
516  if (!ctx->hasSeek)
517  fatal("could not find block ID %d in archive -- "
518  "possibly due to out-of-order restore request, "
519  "which cannot be handled due to non-seekable input file",
520  te->dumpId);
521  else
522  fatal("could not find block ID %d in archive -- "
523  "possibly corrupt archive",
524  te->dumpId);
525  }
526 
527  /* Are we sane? */
528  if (id != te->dumpId)
529  fatal("found unexpected block ID (%d) when reading data -- expected %d",
530  id, te->dumpId);
531 
532  switch (blkType)
533  {
534  case BLK_DATA:
535  _PrintData(AH);
536  break;
537 
538  case BLK_BLOBS:
539  _LoadBlobs(AH, AH->public.ropt->dropSchema);
540  break;
541 
542  default: /* Always have a default */
543  fatal("unrecognized data block type %d while restoring archive",
544  blkType);
545  break;
546  }
547 
548  /*
549  * If our input file is seekable but lacks data offsets, update our
550  * knowledge of where to start future searches from. (Note that we did
551  * not update the current TE's dataState/dataPos. We could have, but
552  * there is no point since it will not be visited again.)
553  */
554  if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
555  {
556  pgoff_t curPos = _getFilePos(AH, ctx);
557 
558  if (curPos > ctx->lastFilePos)
559  ctx->lastFilePos = curPos;
560  }
561 }
562 
563 /*
564  * Print data from current file position.
565 */
566 static void
568 {
570 }
571 
572 static void
573 _LoadBlobs(ArchiveHandle *AH, bool drop)
574 {
575  Oid oid;
576 
577  StartRestoreBlobs(AH);
578 
579  oid = ReadInt(AH);
580  while (oid != 0)
581  {
582  StartRestoreBlob(AH, oid, drop);
583  _PrintData(AH);
584  EndRestoreBlob(AH, oid);
585  oid = ReadInt(AH);
586  }
587 
588  EndRestoreBlobs(AH);
589 }
590 
591 /*
592  * Skip the BLOBs from the current file position.
593  * BLOBS are written sequentially as data blocks (see below).
594  * Each BLOB is preceded by its original OID.
595  * A zero OID indicates the end of the BLOBS.
596  */
597 static void
599 {
600  Oid oid;
601 
602  oid = ReadInt(AH);
603  while (oid != 0)
604  {
605  _skipData(AH);
606  oid = ReadInt(AH);
607  }
608 }
609 
610 /*
611  * Skip data from current file position.
612  * Data blocks are formatted as an integer length, followed by data.
613  * A zero length indicates the end of the block.
614 */
615 static void
617 {
618  lclContext *ctx = (lclContext *) AH->formatData;
619  size_t blkLen;
620  char *buf = NULL;
621  int buflen = 0;
622 
623  blkLen = ReadInt(AH);
624  while (blkLen != 0)
625  {
626  if (ctx->hasSeek)
627  {
628  if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
629  fatal("error during file seek: %m");
630  }
631  else
632  {
633  if (blkLen > buflen)
634  {
635  if (buf)
636  free(buf);
637  buf = (char *) pg_malloc(blkLen);
638  buflen = blkLen;
639  }
640  if (fread(buf, 1, blkLen, AH->FH) != blkLen)
641  {
642  if (feof(AH->FH))
643  fatal("could not read from input file: end of file");
644  else
645  fatal("could not read from input file: %m");
646  }
647  }
648 
649  blkLen = ReadInt(AH);
650  }
651 
652  if (buf)
653  free(buf);
654 }
655 
656 /*
657  * Write a byte of data to the archive.
658  *
659  * Mandatory.
660  *
661  * Called by the archiver to do integer & byte output to the archive.
662  */
663 static int
664 _WriteByte(ArchiveHandle *AH, const int i)
665 {
666  if (fputc(i, AH->FH) == EOF)
668 
669  return 1;
670 }
671 
672 /*
673  * Read a byte of data from the archive.
674  *
675  * Mandatory
676  *
677  * Called by the archiver to read bytes & integers from the archive.
678  * EOF should be treated as a fatal error.
679  */
680 static int
682 {
683  int res;
684 
685  res = getc(AH->FH);
686  if (res == EOF)
687  READ_ERROR_EXIT(AH->FH);
688  return res;
689 }
690 
691 /*
692  * Write a buffer of data to the archive.
693  *
694  * Mandatory.
695  *
696  * Called by the archiver to write a block of bytes to the archive.
697  */
698 static void
699 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
700 {
701  if (fwrite(buf, 1, len, AH->FH) != len)
703 }
704 
705 /*
706  * Read a block of bytes from the archive.
707  *
708  * Mandatory.
709  *
710  * Called by the archiver to read a block of bytes from the archive
711  */
712 static void
713 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
714 {
715  if (fread(buf, 1, len, AH->FH) != len)
716  READ_ERROR_EXIT(AH->FH);
717 }
718 
719 /*
720  * Close the archive.
721  *
722  * Mandatory.
723  *
724  * When writing the archive, this is the routine that actually starts
725  * the process of saving it to files. No data should be written prior
726  * to this point, since the user could sort the TOC after creating it.
727  *
728  * If an archive is to be written, this routine must call:
729  * WriteHead to save the archive header
730  * WriteToc to save the TOC entries
731  * WriteDataChunks to save all DATA & BLOBs.
732  *
733  */
734 static void
736 {
737  lclContext *ctx = (lclContext *) AH->formatData;
738  pgoff_t tpos;
739 
740  if (AH->mode == archModeWrite)
741  {
742  WriteHead(AH);
743  /* Remember TOC's seek position for use below */
744  tpos = ftello(AH->FH);
745  if (tpos < 0 && ctx->hasSeek)
746  fatal("could not determine seek position in archive file: %m");
747  WriteToc(AH);
748  WriteDataChunks(AH, NULL);
749 
750  /*
751  * If possible, re-write the TOC in order to update the data offset
752  * information. This is not essential, as pg_restore can cope in most
753  * cases without it; but it can make pg_restore significantly faster
754  * in some situations (especially parallel restore).
755  */
756  if (ctx->hasSeek &&
757  fseeko(AH->FH, tpos, SEEK_SET) == 0)
758  WriteToc(AH);
759  }
760 
761  if (fclose(AH->FH) != 0)
762  fatal("could not close archive file: %m");
763 
764  /* Sync the output file if one is defined */
765  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
766  (void) fsync_fname(AH->fSpec, false);
767 
768  AH->FH = NULL;
769 }
770 
771 /*
772  * Reopen the archive's file handle.
773  *
774  * We close the original file handle, except on Windows. (The difference
775  * is because on Windows, this is used within a multithreading context,
776  * and we don't want a thread closing the parent file handle.)
777  */
778 static void
780 {
781  lclContext *ctx = (lclContext *) AH->formatData;
782  pgoff_t tpos;
783 
784  if (AH->mode == archModeWrite)
785  fatal("can only reopen input archives");
786 
787  /*
788  * These two cases are user-facing errors since they represent unsupported
789  * (but not invalid) use-cases. Word the error messages appropriately.
790  */
791  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
792  fatal("parallel restore from standard input is not supported");
793  if (!ctx->hasSeek)
794  fatal("parallel restore from non-seekable file is not supported");
795 
796  tpos = ftello(AH->FH);
797  if (tpos < 0)
798  fatal("could not determine seek position in archive file: %m");
799 
800 #ifndef WIN32
801  if (fclose(AH->FH) != 0)
802  fatal("could not close archive file: %m");
803 #endif
804 
805  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
806  if (!AH->FH)
807  fatal("could not open input file \"%s\": %m", AH->fSpec);
808 
809  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
810  fatal("could not set seek position in archive file: %m");
811 }
812 
813 /*
814  * Prepare for parallel restore.
815  *
816  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
817  * TOC entries' dataLength fields with appropriate values to guide the
818  * ordering of restore jobs. The source of said data is format-dependent,
819  * as is the exact meaning of the values.
820  *
821  * A format module might also choose to do other setup here.
822  */
823 static void
825 {
826  lclContext *ctx = (lclContext *) AH->formatData;
827  TocEntry *prev_te = NULL;
828  lclTocEntry *prev_tctx = NULL;
829  TocEntry *te;
830 
831  /*
832  * Knowing that the data items were dumped out in TOC order, we can
833  * reconstruct the length of each item as the delta to the start offset of
834  * the next data item.
835  */
836  for (te = AH->toc->next; te != AH->toc; te = te->next)
837  {
838  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
839 
840  /*
841  * Ignore entries without a known data offset; if we were unable to
842  * seek to rewrite the TOC when creating the archive, this'll be all
843  * of them, and we'll end up with no size estimates.
844  */
845  if (tctx->dataState != K_OFFSET_POS_SET)
846  continue;
847 
848  /* Compute previous data item's length */
849  if (prev_te)
850  {
851  if (tctx->dataPos > prev_tctx->dataPos)
852  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
853  }
854 
855  prev_te = te;
856  prev_tctx = tctx;
857  }
858 
859  /* If OK to seek, we can determine the length of the last item */
860  if (prev_te && ctx->hasSeek)
861  {
862  pgoff_t endpos;
863 
864  if (fseeko(AH->FH, 0, SEEK_END) != 0)
865  fatal("error during file seek: %m");
866  endpos = ftello(AH->FH);
867  if (endpos > prev_tctx->dataPos)
868  prev_te->dataLength = endpos - prev_tctx->dataPos;
869  }
870 }
871 
872 /*
873  * Clone format-specific fields during parallel restoration.
874  */
875 static void
877 {
878  lclContext *ctx = (lclContext *) AH->formatData;
879 
880  /*
881  * Each thread must have private lclContext working state.
882  */
883  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
884  memcpy(AH->formatData, ctx, sizeof(lclContext));
885  ctx = (lclContext *) AH->formatData;
886 
887  /* sanity check, shouldn't happen */
888  if (ctx->cs != NULL)
889  fatal("compressor active");
890 
891  /*
892  * We intentionally do not clone TOC-entry-local state: it's useful to
893  * share knowledge about where the data blocks are across threads.
894  * _PrintTocData has to be careful about the order of operations on that
895  * state, though.
896  *
897  * Note: we do not make a local lo_buf because we expect at most one BLOBS
898  * entry per archive, so no parallelism is possible.
899  */
900 }
901 
902 static void
904 {
905  lclContext *ctx = (lclContext *) AH->formatData;
906 
907  free(ctx);
908 }
909 
910 /*
911  * This function is executed in the child of a parallel restore from a
912  * custom-format archive and restores the actual data for one TOC entry.
913  */
914 static int
916 {
917  return parallel_restore(AH, te);
918 }
919 
920 /*--------------------------------------------------
921  * END OF FORMAT CALLBACKS
922  *--------------------------------------------------
923  */
924 
925 /*
926  * Get the current position in the archive file.
927  *
928  * With a non-seekable archive file, we may not be able to obtain the
929  * file position. If so, just return -1. It's not too important in
930  * that case because we won't be able to rewrite the TOC to fill in
931  * data block offsets anyway.
932  */
933 static pgoff_t
935 {
936  pgoff_t pos;
937 
938  pos = ftello(AH->FH);
939  if (pos < 0)
940  {
941  /* Not expected if we found we can seek. */
942  if (ctx->hasSeek)
943  fatal("could not determine seek position in archive file: %m");
944  }
945  return pos;
946 }
947 
948 /*
949  * Read a data block header. The format changed in V1.3, so we
950  * centralize the code here for simplicity. Returns *type = EOF
951  * if at EOF.
952  */
953 static void
955 {
956  int byt;
957 
958  /*
959  * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
960  * ReadInt rather than returning EOF. It doesn't seem worth jumping
961  * through hoops to deal with that case better, because no such files are
962  * likely to exist in the wild: only some 7.1 development versions of
963  * pg_dump ever generated such files.
964  */
965  if (AH->version < K_VERS_1_3)
966  *type = BLK_DATA;
967  else
968  {
969  byt = getc(AH->FH);
970  *type = byt;
971  if (byt == EOF)
972  {
973  *id = 0; /* don't return an uninitialized value */
974  return;
975  }
976  }
977 
978  *id = ReadInt(AH);
979 }
980 
981 /*
982  * Callback function for WriteDataToArchive. Writes one block of (compressed)
983  * data to the archive.
984  */
985 static void
986 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
987 {
988  /* never write 0-byte blocks (this should not happen) */
989  if (len > 0)
990  {
991  WriteInt(AH, len);
992  _WriteBuf(AH, buf, len);
993  }
994 }
995 
996 /*
997  * Callback function for ReadDataFromArchive. To keep things simple, we
998  * always read one compressed block at a time.
999  */
1000 static size_t
1001 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1002 {
1003  size_t blkLen;
1004 
1005  /* Read length */
1006  blkLen = ReadInt(AH);
1007  if (blkLen == 0)
1008  return 0;
1009 
1010  /* If the caller's buffer is not large enough, allocate a bigger one */
1011  if (blkLen > *buflen)
1012  {
1013  free(*buf);
1014  *buf = (char *) pg_malloc(blkLen);
1015  *buflen = blkLen;
1016  }
1017 
1018  /* exits app on read errors */
1019  _ReadBuf(AH, *buf, blkLen);
1020 
1021  return blkLen;
1022 }
struct _tocEntry * next
static void _ReopenArchive(ArchiveHandle *AH)
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
ReopenPtrType ReopenPtr
ReadBufPtrType ReadBufPtr
void ReadToc(ArchiveHandle *AH)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
void ReadHead(ArchiveHandle *AH)
DeClonePtrType DeClonePtr
static void _Clone(ArchiveHandle *AH)
#define BLK_DATA
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
RestoreOptions * ropt
Definition: pg_backup.h:191
WriteBufPtrType WriteBufPtr
#define BLK_BLOBS
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:633
StartBlobPtrType StartBlobPtr
static void _skipBlobs(ArchiveHandle *AH)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
EndDataPtrType EndDataPtr
#define PG_BINARY_W
Definition: c.h:1214
void WriteToc(ArchiveHandle *AH)
DataDumperPtr dataDumper
int ReadInt(ArchiveHandle *AH)
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
StartBlobsPtrType StartBlobsPtr
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY_R
Definition: c.h:1213
void StartRestoreBlobs(ArchiveHandle *AH)
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
ReadBytePtrType ReadBytePtr
static void _PrintData(ArchiveHandle *AH)
CompressorState * cs
#define K_VERS_1_7
#define pgoff_t
Definition: win32_port.h:194
#define ftello(stream)
Definition: win32_port.h:204
EndBlobPtrType EndBlobPtr
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static void _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static XLogRecPtr endpos
Definition: pg_receivewal.c:46
void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
struct _tocEntry * toc
static char * buf
Definition: pg_test_fsync.c:67
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
StartDataPtrType StartDataPtr
static void _LoadBlobs(ArchiveHandle *AH, bool drop)
#define K_OFFSET_NO_DATA
WriteBytePtrType WriteBytePtr
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_3
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static void _skipData(ArchiveHandle *AH)
static void _PrepParallelRestore(ArchiveHandle *AH)
void EndRestoreBlobs(ArchiveHandle *AH)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static int _ReadByte(ArchiveHandle *)
int verbose
Definition: pg_backup.h:193
PrintTocDataPtrType PrintTocDataPtr
#define fseeko(stream, offset, origin)
Definition: win32_port.h:201
WriteDataPtrType WriteDataPtr
#define free(a)
Definition: header.h:65
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te)
PrepParallelRestorePtrType PrepParallelRestorePtr
EndBlobsPtrType EndBlobsPtr
size_t WriteInt(ArchiveHandle *AH, int i)
ArchiveEntryPtrType ArchiveEntryPtr
#define fatal(...)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
#define INT64_FORMAT
Definition: c.h:416
void WriteHead(ArchiveHandle *AH)
#define WRITE_ERROR_EXIT
ClonePtrType ClonePtr
void EndRestoreBlob(ArchiveHandle *AH, Oid oid)
#define K_OFFSET_POS_SET
int i
WorkerJobRestorePtrType WorkerJobRestorePtr
pgoff_t lastFilePos
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define READ_ERROR_EXIT(fd)
static void _DeClone(ArchiveHandle *AH)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
bool checkSeek(FILE *fp)
ClosePtrType ClosePtr
PrintExtraTocPtrType PrintExtraTocPtr
#define pg_log_warning(...)
Definition: pgfnames.c:24
static int _WriteByte(ArchiveHandle *AH, const int i)
WriteExtraTocPtrType WriteExtraTocPtr
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:201
#define LOBBUFSIZE
ReadExtraTocPtrType ReadExtraTocPtr
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_OFFSET_POS_NOT_SET
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
WorkerJobDumpPtrType WorkerJobDumpPtr