PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56 
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
68 
69 typedef struct
70 {
72  int hasSeek;
73  /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74  pgoff_t lastFilePos; /* position after last data block we've read */
75 } lclContext;
76 
77 typedef struct
78 {
79  int dataState;
80  pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  * Init routine required by ALL formats. This is a global routine
97  * and should be declared in pg_backup_archiver.h
98  *
99  * It's task is to create any extra archive context (using AH->formatData),
100  * and to initialize the supported function pointers.
101  *
102  * It should also prepare whatever it's input source is for reading/writing,
103  * and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
107 {
108  lclContext *ctx;
109 
110  /* Assuming static functions, this can be copied for each format. */
112  AH->StartDataPtr = _StartData;
113  AH->WriteDataPtr = _WriteData;
114  AH->EndDataPtr = _EndData;
115  AH->WriteBytePtr = _WriteByte;
116  AH->ReadBytePtr = _ReadByte;
117  AH->WriteBufPtr = _WriteBuf;
118  AH->ReadBufPtr = _ReadBuf;
119  AH->ClosePtr = _CloseArchive;
125 
127  AH->StartBlobPtr = _StartBlob;
128  AH->EndBlobPtr = _EndBlob;
129  AH->EndBlobsPtr = _EndBlobs;
130 
132  AH->ClonePtr = _Clone;
133  AH->DeClonePtr = _DeClone;
134 
135  /* no parallel dump in the custom archive, only parallel restore */
136  AH->WorkerJobDumpPtr = NULL;
138 
139  /* Set up a private area. */
140  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141  AH->formatData = (void *) ctx;
142 
143  /* Initialize LO buffering */
144  AH->lo_buf_size = LOBBUFSIZE;
145  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147  /*
148  * Now open the file
149  */
150  if (AH->mode == archModeWrite)
151  {
152  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153  {
154  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
155  if (!AH->FH)
156  pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
157  }
158  else
159  {
160  AH->FH = stdout;
161  if (!AH->FH)
162  pg_fatal("could not open output file: %m");
163  }
164 
165  ctx->hasSeek = checkSeek(AH->FH);
166  }
167  else
168  {
169  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170  {
171  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
172  if (!AH->FH)
173  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
174  }
175  else
176  {
177  AH->FH = stdin;
178  if (!AH->FH)
179  pg_fatal("could not open input file: %m");
180  }
181 
182  ctx->hasSeek = checkSeek(AH->FH);
183 
184  ReadHead(AH);
185  ReadToc(AH);
186 
187  /*
188  * Remember location of first data block (i.e., the point after TOC)
189  * in case we have to search for desired data blocks.
190  */
191  ctx->lastFilePos = _getFilePos(AH, ctx);
192  }
193 }
194 
195 /*
196  * Called by the Archiver when the dumper creates a new TOC entry.
197  *
198  * Optional.
199  *
200  * Set up extract format-related TOC data.
201 */
202 static void
204 {
205  lclTocEntry *ctx;
206 
207  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
208  if (te->dataDumper)
210  else
212 
213  te->formatData = (void *) ctx;
214 }
215 
216 /*
217  * Called by the Archiver to save any extra format-related TOC entry
218  * data.
219  *
220  * Optional.
221  *
222  * Use the Archiver routines to write data - they are non-endian, and
223  * maintain other important file information.
224  */
225 static void
227 {
228  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229 
230  WriteOffset(AH, ctx->dataPos, ctx->dataState);
231 }
232 
233 /*
234  * Called by the Archiver to read any extra format-related TOC data.
235  *
236  * Optional.
237  *
238  * Needs to match the order defined in _WriteExtraToc, and should also
239  * use the Archiver input routines.
240  */
241 static void
243 {
244  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245 
246  if (ctx == NULL)
247  {
248  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
249  te->formatData = (void *) ctx;
250  }
251 
252  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253 
254  /*
255  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256  * dump it at all.
257  */
258  if (AH->version < K_VERS_1_7)
259  ReadInt(AH);
260 }
261 
262 /*
263  * Called by the Archiver when restoring an archive to output a comment
264  * that includes useful information about the TOC entry.
265  *
266  * Optional.
267  */
268 static void
270 {
271  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272 
273  if (AH->public.verbose)
274  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275  (int64) ctx->dataPos);
276 }
277 
278 /*
279  * Called by the archiver when saving TABLE DATA (not schema). This routine
280  * should save whatever format-specific information is needed to read
281  * the archive back.
282  *
283  * It is called just prior to the dumper's 'DataDumper' routine being called.
284  *
285  * Optional, but strongly recommended.
286  *
287  */
288 static void
290 {
291  lclContext *ctx = (lclContext *) AH->formatData;
292  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 
294  tctx->dataPos = _getFilePos(AH, ctx);
295  if (tctx->dataPos >= 0)
296  tctx->dataState = K_OFFSET_POS_SET;
297 
298  _WriteByte(AH, BLK_DATA); /* Block type */
299  WriteInt(AH, te->dumpId); /* For sanity check */
300 
302 }
303 
304 /*
305  * Called by archiver when dumper calls WriteData. This routine is
306  * called for both BLOB and TABLE data; it is the responsibility of
307  * the format to manage each kind of data using StartBlob/StartData.
308  *
309  * It should only be called from within a DataDumper routine.
310  *
311  * Mandatory.
312  */
313 static void
314 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
315 {
316  lclContext *ctx = (lclContext *) AH->formatData;
317  CompressorState *cs = ctx->cs;
318 
319  if (dLen > 0)
320  /* WriteDataToArchive() internally throws write errors */
321  WriteDataToArchive(AH, cs, data, dLen);
322 }
323 
324 /*
325  * Called by the archiver when a dumper's 'DataDumper' routine has
326  * finished.
327  *
328  * Optional.
329  */
330 static void
332 {
333  lclContext *ctx = (lclContext *) AH->formatData;
334 
335  EndCompressor(AH, ctx->cs);
336  /* Send the end marker */
337  WriteInt(AH, 0);
338 }
339 
340 /*
341  * Called by the archiver when starting to save all BLOB DATA (not schema).
342  * This routine should save whatever format-specific information is needed
343  * to read the BLOBs back into memory.
344  *
345  * It is called just prior to the dumper's DataDumper routine.
346  *
347  * Optional, but strongly recommended.
348  */
349 static void
351 {
352  lclContext *ctx = (lclContext *) AH->formatData;
353  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
354 
355  tctx->dataPos = _getFilePos(AH, ctx);
356  if (tctx->dataPos >= 0)
357  tctx->dataState = K_OFFSET_POS_SET;
358 
359  _WriteByte(AH, BLK_BLOBS); /* Block type */
360  WriteInt(AH, te->dumpId); /* For sanity check */
361 }
362 
363 /*
364  * Called by the archiver when the dumper calls StartBlob.
365  *
366  * Mandatory.
367  *
368  * Must save the passed OID for retrieval at restore-time.
369  */
370 static void
372 {
373  lclContext *ctx = (lclContext *) AH->formatData;
374 
375  if (oid == 0)
376  pg_fatal("invalid OID for large object");
377 
378  WriteInt(AH, oid);
379 
381 }
382 
383 /*
384  * Called by the archiver when the dumper calls EndBlob.
385  *
386  * Optional.
387  */
388 static void
390 {
391  lclContext *ctx = (lclContext *) AH->formatData;
392 
393  EndCompressor(AH, ctx->cs);
394  /* Send the end marker */
395  WriteInt(AH, 0);
396 }
397 
398 /*
399  * Called by the archiver when finishing saving all BLOB DATA.
400  *
401  * Optional.
402  */
403 static void
405 {
406  /* Write out a fake zero OID to mark end-of-blobs. */
407  WriteInt(AH, 0);
408 }
409 
410 /*
411  * Print data for a given TOC entry
412  */
413 static void
415 {
416  lclContext *ctx = (lclContext *) AH->formatData;
417  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
418  int blkType;
419  int id;
420 
421  if (tctx->dataState == K_OFFSET_NO_DATA)
422  return;
423 
424  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
425  {
426  /*
427  * We cannot seek directly to the desired block. Instead, skip over
428  * block headers until we find the one we want. Remember the
429  * positions of skipped-over blocks, so that if we later decide we
430  * need to read one, we'll be able to seek to it.
431  *
432  * When our input file is seekable, we can do the search starting from
433  * the point after the last data block we scanned in previous
434  * iterations of this function.
435  */
436  if (ctx->hasSeek)
437  {
438  if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
439  pg_fatal("error during file seek: %m");
440  }
441 
442  for (;;)
443  {
444  pgoff_t thisBlkPos = _getFilePos(AH, ctx);
445 
446  _readBlockHeader(AH, &blkType, &id);
447 
448  if (blkType == EOF || id == te->dumpId)
449  break;
450 
451  /* Remember the block position, if we got one */
452  if (thisBlkPos >= 0)
453  {
454  TocEntry *otherte = getTocEntryByDumpId(AH, id);
455 
456  if (otherte && otherte->formatData)
457  {
458  lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
459 
460  /*
461  * Note: on Windows, multiple threads might access/update
462  * the same lclTocEntry concurrently, but that should be
463  * safe as long as we update dataPos before dataState.
464  * Ideally, we'd use pg_write_barrier() to enforce that,
465  * but the needed infrastructure doesn't exist in frontend
466  * code. But Windows only runs on machines with strong
467  * store ordering, so it should be okay for now.
468  */
469  if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
470  {
471  othertctx->dataPos = thisBlkPos;
472  othertctx->dataState = K_OFFSET_POS_SET;
473  }
474  else if (othertctx->dataPos != thisBlkPos ||
475  othertctx->dataState != K_OFFSET_POS_SET)
476  {
477  /* sanity check */
478  pg_log_warning("data block %d has wrong seek position",
479  id);
480  }
481  }
482  }
483 
484  switch (blkType)
485  {
486  case BLK_DATA:
487  _skipData(AH);
488  break;
489 
490  case BLK_BLOBS:
491  _skipBlobs(AH);
492  break;
493 
494  default: /* Always have a default */
495  pg_fatal("unrecognized data block type (%d) while searching archive",
496  blkType);
497  break;
498  }
499  }
500  }
501  else
502  {
503  /* We can just seek to the place we need to be. */
504  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
505  pg_fatal("error during file seek: %m");
506 
507  _readBlockHeader(AH, &blkType, &id);
508  }
509 
510  /*
511  * If we reached EOF without finding the block we want, then either it
512  * doesn't exist, or it does but we lack the ability to seek back to it.
513  */
514  if (blkType == EOF)
515  {
516  if (!ctx->hasSeek)
517  pg_fatal("could not find block ID %d in archive -- "
518  "possibly due to out-of-order restore request, "
519  "which cannot be handled due to non-seekable input file",
520  te->dumpId);
521  else
522  pg_fatal("could not find block ID %d in archive -- "
523  "possibly corrupt archive",
524  te->dumpId);
525  }
526 
527  /* Are we sane? */
528  if (id != te->dumpId)
529  pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
530  id, te->dumpId);
531 
532  switch (blkType)
533  {
534  case BLK_DATA:
535  _PrintData(AH);
536  break;
537 
538  case BLK_BLOBS:
539  _LoadBlobs(AH, AH->public.ropt->dropSchema);
540  break;
541 
542  default: /* Always have a default */
543  pg_fatal("unrecognized data block type %d while restoring archive",
544  blkType);
545  break;
546  }
547 
548  /*
549  * If our input file is seekable but lacks data offsets, update our
550  * knowledge of where to start future searches from. (Note that we did
551  * not update the current TE's dataState/dataPos. We could have, but
552  * there is no point since it will not be visited again.)
553  */
554  if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
555  {
556  pgoff_t curPos = _getFilePos(AH, ctx);
557 
558  if (curPos > ctx->lastFilePos)
559  ctx->lastFilePos = curPos;
560  }
561 }
562 
563 /*
564  * Print data from current file position.
565 */
566 static void
568 {
570 }
571 
572 static void
573 _LoadBlobs(ArchiveHandle *AH, bool drop)
574 {
575  Oid oid;
576 
577  StartRestoreBlobs(AH);
578 
579  oid = ReadInt(AH);
580  while (oid != 0)
581  {
582  StartRestoreBlob(AH, oid, drop);
583  _PrintData(AH);
584  EndRestoreBlob(AH, oid);
585  oid = ReadInt(AH);
586  }
587 
588  EndRestoreBlobs(AH);
589 }
590 
591 /*
592  * Skip the BLOBs from the current file position.
593  * BLOBS are written sequentially as data blocks (see below).
594  * Each BLOB is preceded by its original OID.
595  * A zero OID indicates the end of the BLOBS.
596  */
597 static void
599 {
600  Oid oid;
601 
602  oid = ReadInt(AH);
603  while (oid != 0)
604  {
605  _skipData(AH);
606  oid = ReadInt(AH);
607  }
608 }
609 
610 /*
611  * Skip data from current file position.
612  * Data blocks are formatted as an integer length, followed by data.
613  * A zero length indicates the end of the block.
614 */
615 static void
617 {
618  lclContext *ctx = (lclContext *) AH->formatData;
619  size_t blkLen;
620  char *buf = NULL;
621  int buflen = 0;
622 
623  blkLen = ReadInt(AH);
624  while (blkLen != 0)
625  {
626  if (ctx->hasSeek)
627  {
628  if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
629  pg_fatal("error during file seek: %m");
630  }
631  else
632  {
633  if (blkLen > buflen)
634  {
635  if (buf)
636  free(buf);
637  buf = (char *) pg_malloc(blkLen);
638  buflen = blkLen;
639  }
640  if (fread(buf, 1, blkLen, AH->FH) != blkLen)
641  {
642  if (feof(AH->FH))
643  pg_fatal("could not read from input file: end of file");
644  else
645  pg_fatal("could not read from input file: %m");
646  }
647  }
648 
649  blkLen = ReadInt(AH);
650  }
651 
652  if (buf)
653  free(buf);
654 }
655 
656 /*
657  * Write a byte of data to the archive.
658  *
659  * Mandatory.
660  *
661  * Called by the archiver to do integer & byte output to the archive.
662  */
663 static int
664 _WriteByte(ArchiveHandle *AH, const int i)
665 {
666  if (fputc(i, AH->FH) == EOF)
668 
669  return 1;
670 }
671 
672 /*
673  * Read a byte of data from the archive.
674  *
675  * Mandatory
676  *
677  * Called by the archiver to read bytes & integers from the archive.
678  * EOF should be treated as a fatal error.
679  */
680 static int
682 {
683  int res;
684 
685  res = getc(AH->FH);
686  if (res == EOF)
687  READ_ERROR_EXIT(AH->FH);
688  return res;
689 }
690 
691 /*
692  * Write a buffer of data to the archive.
693  *
694  * Mandatory.
695  *
696  * Called by the archiver to write a block of bytes to the archive.
697  */
698 static void
699 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
700 {
701  if (fwrite(buf, 1, len, AH->FH) != len)
703 }
704 
705 /*
706  * Read a block of bytes from the archive.
707  *
708  * Mandatory.
709  *
710  * Called by the archiver to read a block of bytes from the archive
711  */
712 static void
713 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
714 {
715  if (fread(buf, 1, len, AH->FH) != len)
716  READ_ERROR_EXIT(AH->FH);
717 }
718 
719 /*
720  * Close the archive.
721  *
722  * Mandatory.
723  *
724  * When writing the archive, this is the routine that actually starts
725  * the process of saving it to files. No data should be written prior
726  * to this point, since the user could sort the TOC after creating it.
727  *
728  * If an archive is to be written, this routine must call:
729  * WriteHead to save the archive header
730  * WriteToc to save the TOC entries
731  * WriteDataChunks to save all DATA & BLOBs.
732  *
733  */
734 static void
736 {
737  lclContext *ctx = (lclContext *) AH->formatData;
738  pgoff_t tpos;
739 
740  if (AH->mode == archModeWrite)
741  {
742  WriteHead(AH);
743  /* Remember TOC's seek position for use below */
744  tpos = ftello(AH->FH);
745  if (tpos < 0 && ctx->hasSeek)
746  pg_fatal("could not determine seek position in archive file: %m");
747  WriteToc(AH);
748  WriteDataChunks(AH, NULL);
749 
750  /*
751  * If possible, re-write the TOC in order to update the data offset
752  * information. This is not essential, as pg_restore can cope in most
753  * cases without it; but it can make pg_restore significantly faster
754  * in some situations (especially parallel restore).
755  */
756  if (ctx->hasSeek &&
757  fseeko(AH->FH, tpos, SEEK_SET) == 0)
758  WriteToc(AH);
759  }
760 
761  if (fclose(AH->FH) != 0)
762  pg_fatal("could not close archive file: %m");
763 
764  /* Sync the output file if one is defined */
765  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
766  (void) fsync_fname(AH->fSpec, false);
767 
768  AH->FH = NULL;
769 }
770 
771 /*
772  * Reopen the archive's file handle.
773  *
774  * We close the original file handle, except on Windows. (The difference
775  * is because on Windows, this is used within a multithreading context,
776  * and we don't want a thread closing the parent file handle.)
777  */
778 static void
780 {
781  lclContext *ctx = (lclContext *) AH->formatData;
782  pgoff_t tpos;
783 
784  if (AH->mode == archModeWrite)
785  pg_fatal("can only reopen input archives");
786 
787  /*
788  * These two cases are user-facing errors since they represent unsupported
789  * (but not invalid) use-cases. Word the error messages appropriately.
790  */
791  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
792  pg_fatal("parallel restore from standard input is not supported");
793  if (!ctx->hasSeek)
794  pg_fatal("parallel restore from non-seekable file is not supported");
795 
796  tpos = ftello(AH->FH);
797  if (tpos < 0)
798  pg_fatal("could not determine seek position in archive file: %m");
799 
800 #ifndef WIN32
801  if (fclose(AH->FH) != 0)
802  pg_fatal("could not close archive file: %m");
803 #endif
804 
805  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
806  if (!AH->FH)
807  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
808 
809  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
810  pg_fatal("could not set seek position in archive file: %m");
811 }
812 
813 /*
814  * Prepare for parallel restore.
815  *
816  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
817  * TOC entries' dataLength fields with appropriate values to guide the
818  * ordering of restore jobs. The source of said data is format-dependent,
819  * as is the exact meaning of the values.
820  *
821  * A format module might also choose to do other setup here.
822  */
823 static void
825 {
826  lclContext *ctx = (lclContext *) AH->formatData;
827  TocEntry *prev_te = NULL;
828  lclTocEntry *prev_tctx = NULL;
829  TocEntry *te;
830 
831  /*
832  * Knowing that the data items were dumped out in TOC order, we can
833  * reconstruct the length of each item as the delta to the start offset of
834  * the next data item.
835  */
836  for (te = AH->toc->next; te != AH->toc; te = te->next)
837  {
838  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
839 
840  /*
841  * Ignore entries without a known data offset; if we were unable to
842  * seek to rewrite the TOC when creating the archive, this'll be all
843  * of them, and we'll end up with no size estimates.
844  */
845  if (tctx->dataState != K_OFFSET_POS_SET)
846  continue;
847 
848  /* Compute previous data item's length */
849  if (prev_te)
850  {
851  if (tctx->dataPos > prev_tctx->dataPos)
852  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
853  }
854 
855  prev_te = te;
856  prev_tctx = tctx;
857  }
858 
859  /* If OK to seek, we can determine the length of the last item */
860  if (prev_te && ctx->hasSeek)
861  {
862  pgoff_t endpos;
863 
864  if (fseeko(AH->FH, 0, SEEK_END) != 0)
865  pg_fatal("error during file seek: %m");
866  endpos = ftello(AH->FH);
867  if (endpos > prev_tctx->dataPos)
868  prev_te->dataLength = endpos - prev_tctx->dataPos;
869  }
870 }
871 
872 /*
873  * Clone format-specific fields during parallel restoration.
874  */
875 static void
877 {
878  lclContext *ctx = (lclContext *) AH->formatData;
879 
880  /*
881  * Each thread must have private lclContext working state.
882  */
883  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
884  memcpy(AH->formatData, ctx, sizeof(lclContext));
885  ctx = (lclContext *) AH->formatData;
886 
887  /* sanity check, shouldn't happen */
888  if (ctx->cs != NULL)
889  pg_fatal("compressor active");
890 
891  /*
892  * We intentionally do not clone TOC-entry-local state: it's useful to
893  * share knowledge about where the data blocks are across threads.
894  * _PrintTocData has to be careful about the order of operations on that
895  * state, though.
896  *
897  * Note: we do not make a local lo_buf because we expect at most one BLOBS
898  * entry per archive, so no parallelism is possible.
899  */
900 }
901 
902 static void
904 {
905  lclContext *ctx = (lclContext *) AH->formatData;
906 
907  free(ctx);
908 }
909 
910 /*
911  * This function is executed in the child of a parallel restore from a
912  * custom-format archive and restores the actual data for one TOC entry.
913  */
914 static int
916 {
917  return parallel_restore(AH, te);
918 }
919 
920 /*--------------------------------------------------
921  * END OF FORMAT CALLBACKS
922  *--------------------------------------------------
923  */
924 
925 /*
926  * Get the current position in the archive file.
927  *
928  * With a non-seekable archive file, we may not be able to obtain the
929  * file position. If so, just return -1. It's not too important in
930  * that case because we won't be able to rewrite the TOC to fill in
931  * data block offsets anyway.
932  */
933 static pgoff_t
935 {
936  pgoff_t pos;
937 
938  pos = ftello(AH->FH);
939  if (pos < 0)
940  {
941  /* Not expected if we found we can seek. */
942  if (ctx->hasSeek)
943  pg_fatal("could not determine seek position in archive file: %m");
944  }
945  return pos;
946 }
947 
948 /*
949  * Read a data block header. The format changed in V1.3, so we
950  * centralize the code here for simplicity. Returns *type = EOF
951  * if at EOF.
952  */
953 static void
955 {
956  int byt;
957 
958  /*
959  * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
960  * inside ReadInt rather than returning EOF. It doesn't seem worth
961  * jumping through hoops to deal with that case better, because no such
962  * files are likely to exist in the wild: only some 7.1 development
963  * versions of pg_dump ever generated such files.
964  */
965  if (AH->version < K_VERS_1_3)
966  *type = BLK_DATA;
967  else
968  {
969  byt = getc(AH->FH);
970  *type = byt;
971  if (byt == EOF)
972  {
973  *id = 0; /* don't return an uninitialized value */
974  return;
975  }
976  }
977 
978  *id = ReadInt(AH);
979 }
980 
981 /*
982  * Callback function for WriteDataToArchive. Writes one block of (compressed)
983  * data to the archive.
984  */
985 static void
986 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
987 {
988  /* never write 0-byte blocks (this should not happen) */
989  if (len > 0)
990  {
991  WriteInt(AH, len);
992  _WriteBuf(AH, buf, len);
993  }
994 }
995 
996 /*
997  * Callback function for ReadDataFromArchive. To keep things simple, we
998  * always read one compressed block at a time.
999  */
1000 static size_t
1001 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1002 {
1003  size_t blkLen;
1004 
1005  /* Read length */
1006  blkLen = ReadInt(AH);
1007  if (blkLen == 0)
1008  return 0;
1009 
1010  /* If the caller's buffer is not large enough, allocate a bigger one */
1011  if (blkLen > *buflen)
1012  {
1013  free(*buf);
1014  *buf = (char *) pg_malloc(blkLen);
1015  *buflen = blkLen;
1016  }
1017 
1018  /* exits app on read errors */
1019  _ReadBuf(AH, *buf, blkLen);
1020 
1021  return blkLen;
1022 }
#define PG_BINARY_R
Definition: c.h:1270
#define INT64_FORMAT
Definition: c.h:483
#define PG_BINARY_W
Definition: c.h:1271
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:201
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:673
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
int i
Definition: isn.c:73
@ archModeWrite
Definition: pg_backup.h:49
bool checkSeek(FILE *fp)
void WriteHead(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
size_t WriteInt(ArchiveHandle *AH, int i)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
int ReadInt(ArchiveHandle *AH)
void StartRestoreBlobs(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void ReadToc(ArchiveHandle *AH)
void WriteToc(ArchiveHandle *AH)
void EndRestoreBlob(ArchiveHandle *AH, Oid oid)
void EndRestoreBlobs(ArchiveHandle *AH)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
#define LOBBUFSIZE
#define K_OFFSET_NO_DATA
#define WRITE_ERROR_EXIT
#define BLK_DATA
#define BLK_BLOBS
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define READ_ERROR_EXIT(fd)
#define K_VERS_1_3
#define K_VERS_1_7
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static void _skipData(ArchiveHandle *AH)
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
static void _skipBlobs(ArchiveHandle *AH)
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _DeClone(ArchiveHandle *AH)
static void _ReopenArchive(ArchiveHandle *AH)
void InitArchiveFmt_Custom(ArchiveHandle *AH)
static void _LoadBlobs(ArchiveHandle *AH, bool drop)
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
static int _WriteByte(ArchiveHandle *AH, const int i)
static void _PrepParallelRestore(ArchiveHandle *AH)
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
static void _PrintData(ArchiveHandle *AH)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
static int _ReadByte(ArchiveHandle *)
static void _Clone(ArchiveHandle *AH)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
#define pg_fatal(...)
const void size_t len
const void * data
static XLogRecPtr endpos
Definition: pg_receivewal.c:56
static char * buf
Definition: pg_test_fsync.c:67
#define pg_log_warning(...)
Definition: pgfnames.c:24
unsigned int Oid
Definition: postgres_ext.h:31
int verbose
Definition: pg_backup.h:212
RestoreOptions * ropt
Definition: pg_backup.h:210
struct _tocEntry * toc
DeClonePtrType DeClonePtr
ReadExtraTocPtrType ReadExtraTocPtr
WorkerJobDumpPtrType WorkerJobDumpPtr
EndBlobPtrType EndBlobPtr
ArchiveEntryPtrType ArchiveEntryPtr
WriteDataPtrType WriteDataPtr
ClonePtrType ClonePtr
StartBlobsPtrType StartBlobsPtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
StartBlobPtrType StartBlobPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
WorkerJobRestorePtrType WorkerJobRestorePtr
PrintTocDataPtrType PrintTocDataPtr
WriteBytePtrType WriteBytePtr
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
EndBlobsPtrType EndBlobsPtr
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
EndDataPtrType EndDataPtr
ClosePtrType ClosePtr
DataDumperPtr dataDumper
struct _tocEntry * next
pgoff_t lastFilePos
CompressorState * cs
#define ftello(stream)
Definition: win32_port.h:218
#define fseeko(stream, offset, origin)
Definition: win32_port.h:215
#define pgoff_t
Definition: win32_port.h:208