PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *AH);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipLOs(ArchiveHandle *AH);
56 
57 static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadLOs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
68 
69 typedef struct
70 {
72  int hasSeek;
73  /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74  pgoff_t lastFilePos; /* position after last data block we've read */
75 } lclContext;
76 
77 typedef struct
78 {
79  int dataState;
80  pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  * Init routine required by ALL formats. This is a global routine
97  * and should be declared in pg_backup_archiver.h
98  *
99  * It's task is to create any extra archive context (using AH->formatData),
100  * and to initialize the supported function pointers.
101  *
102  * It should also prepare whatever its input source is for reading/writing,
103  * and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
107 {
108  lclContext *ctx;
109 
110  /* Assuming static functions, this can be copied for each format. */
112  AH->StartDataPtr = _StartData;
113  AH->WriteDataPtr = _WriteData;
114  AH->EndDataPtr = _EndData;
115  AH->WriteBytePtr = _WriteByte;
116  AH->ReadBytePtr = _ReadByte;
117  AH->WriteBufPtr = _WriteBuf;
118  AH->ReadBufPtr = _ReadBuf;
119  AH->ClosePtr = _CloseArchive;
125 
126  AH->StartLOsPtr = _StartLOs;
127  AH->StartLOPtr = _StartLO;
128  AH->EndLOPtr = _EndLO;
129  AH->EndLOsPtr = _EndLOs;
130 
132  AH->ClonePtr = _Clone;
133  AH->DeClonePtr = _DeClone;
134 
135  /* no parallel dump in the custom archive, only parallel restore */
136  AH->WorkerJobDumpPtr = NULL;
138 
139  /* Set up a private area. */
140  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141  AH->formatData = (void *) ctx;
142 
143  /* Initialize LO buffering */
144  AH->lo_buf_size = LOBBUFSIZE;
145  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147  /*
148  * Now open the file
149  */
150  if (AH->mode == archModeWrite)
151  {
152  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153  {
154  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
155  if (!AH->FH)
156  pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
157  }
158  else
159  {
160  AH->FH = stdout;
161  if (!AH->FH)
162  pg_fatal("could not open output file: %m");
163  }
164 
165  ctx->hasSeek = checkSeek(AH->FH);
166  }
167  else
168  {
169  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170  {
171  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
172  if (!AH->FH)
173  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
174  }
175  else
176  {
177  AH->FH = stdin;
178  if (!AH->FH)
179  pg_fatal("could not open input file: %m");
180  }
181 
182  ctx->hasSeek = checkSeek(AH->FH);
183 
184  ReadHead(AH);
185  ReadToc(AH);
186 
187  /*
188  * Remember location of first data block (i.e., the point after TOC)
189  * in case we have to search for desired data blocks.
190  */
191  ctx->lastFilePos = _getFilePos(AH, ctx);
192  }
193 }
194 
195 /*
196  * Called by the Archiver when the dumper creates a new TOC entry.
197  *
198  * Optional.
199  *
200  * Set up extract format-related TOC data.
201 */
202 static void
204 {
205  lclTocEntry *ctx;
206 
207  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
208  if (te->dataDumper)
210  else
212 
213  te->formatData = (void *) ctx;
214 }
215 
216 /*
217  * Called by the Archiver to save any extra format-related TOC entry
218  * data.
219  *
220  * Optional.
221  *
222  * Use the Archiver routines to write data - they are non-endian, and
223  * maintain other important file information.
224  */
225 static void
227 {
228  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229 
230  WriteOffset(AH, ctx->dataPos, ctx->dataState);
231 }
232 
233 /*
234  * Called by the Archiver to read any extra format-related TOC data.
235  *
236  * Optional.
237  *
238  * Needs to match the order defined in _WriteExtraToc, and should also
239  * use the Archiver input routines.
240  */
241 static void
243 {
244  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245 
246  if (ctx == NULL)
247  {
248  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
249  te->formatData = (void *) ctx;
250  }
251 
252  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253 
254  /*
255  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256  * dump it at all.
257  */
258  if (AH->version < K_VERS_1_7)
259  ReadInt(AH);
260 }
261 
262 /*
263  * Called by the Archiver when restoring an archive to output a comment
264  * that includes useful information about the TOC entry.
265  *
266  * Optional.
267  */
268 static void
270 {
271  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272 
273  if (AH->public.verbose)
274  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275  (int64) ctx->dataPos);
276 }
277 
278 /*
279  * Called by the archiver when saving TABLE DATA (not schema). This routine
280  * should save whatever format-specific information is needed to read
281  * the archive back.
282  *
283  * It is called just prior to the dumper's 'DataDumper' routine being called.
284  *
285  * Optional, but strongly recommended.
286  *
287  */
288 static void
290 {
291  lclContext *ctx = (lclContext *) AH->formatData;
292  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 
294  tctx->dataPos = _getFilePos(AH, ctx);
295  if (tctx->dataPos >= 0)
296  tctx->dataState = K_OFFSET_POS_SET;
297 
298  _WriteByte(AH, BLK_DATA); /* Block type */
299  WriteInt(AH, te->dumpId); /* For sanity check */
300 
302  NULL,
304 }
305 
306 /*
307  * Called by archiver when dumper calls WriteData. This routine is
308  * called for both LO and table data; it is the responsibility of
309  * the format to manage each kind of data using StartLO/StartData.
310  *
311  * It should only be called from within a DataDumper routine.
312  *
313  * Mandatory.
314  */
315 static void
316 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 {
318  lclContext *ctx = (lclContext *) AH->formatData;
319  CompressorState *cs = ctx->cs;
320 
321  if (dLen > 0)
322  /* writeData() internally throws write errors */
323  cs->writeData(AH, cs, data, dLen);
324 }
325 
326 /*
327  * Called by the archiver when a dumper's 'DataDumper' routine has
328  * finished.
329  *
330  * Mandatory.
331  */
332 static void
334 {
335  lclContext *ctx = (lclContext *) AH->formatData;
336 
337  EndCompressor(AH, ctx->cs);
338  ctx->cs = NULL;
339 
340  /* Send the end marker */
341  WriteInt(AH, 0);
342 }
343 
344 /*
345  * Called by the archiver when starting to save all BLOB DATA (not schema).
346  * This routine should save whatever format-specific information is needed
347  * to read the LOs back into memory.
348  *
349  * It is called just prior to the dumper's DataDumper routine.
350  *
351  * Optional, but strongly recommended.
352  */
353 static void
355 {
356  lclContext *ctx = (lclContext *) AH->formatData;
357  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
358 
359  tctx->dataPos = _getFilePos(AH, ctx);
360  if (tctx->dataPos >= 0)
361  tctx->dataState = K_OFFSET_POS_SET;
362 
363  _WriteByte(AH, BLK_BLOBS); /* Block type */
364  WriteInt(AH, te->dumpId); /* For sanity check */
365 }
366 
367 /*
368  * Called by the archiver when the dumper calls StartLO.
369  *
370  * Mandatory.
371  *
372  * Must save the passed OID for retrieval at restore-time.
373  */
374 static void
376 {
377  lclContext *ctx = (lclContext *) AH->formatData;
378 
379  if (oid == 0)
380  pg_fatal("invalid OID for large object");
381 
382  WriteInt(AH, oid);
383 
385  NULL,
387 }
388 
389 /*
390  * Called by the archiver when the dumper calls EndLO.
391  *
392  * Optional.
393  */
394 static void
396 {
397  lclContext *ctx = (lclContext *) AH->formatData;
398 
399  EndCompressor(AH, ctx->cs);
400  /* Send the end marker */
401  WriteInt(AH, 0);
402 }
403 
404 /*
405  * Called by the archiver when finishing saving all BLOB DATA.
406  *
407  * Optional.
408  */
409 static void
411 {
412  /* Write out a fake zero OID to mark end-of-LOs. */
413  WriteInt(AH, 0);
414 }
415 
416 /*
417  * Print data for a given TOC entry
418  */
419 static void
421 {
422  lclContext *ctx = (lclContext *) AH->formatData;
423  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
424  int blkType;
425  int id;
426 
427  if (tctx->dataState == K_OFFSET_NO_DATA)
428  return;
429 
430  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
431  {
432  /*
433  * We cannot seek directly to the desired block. Instead, skip over
434  * block headers until we find the one we want. Remember the
435  * positions of skipped-over blocks, so that if we later decide we
436  * need to read one, we'll be able to seek to it.
437  *
438  * When our input file is seekable, we can do the search starting from
439  * the point after the last data block we scanned in previous
440  * iterations of this function.
441  */
442  if (ctx->hasSeek)
443  {
444  if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
445  pg_fatal("error during file seek: %m");
446  }
447 
448  for (;;)
449  {
450  pgoff_t thisBlkPos = _getFilePos(AH, ctx);
451 
452  _readBlockHeader(AH, &blkType, &id);
453 
454  if (blkType == EOF || id == te->dumpId)
455  break;
456 
457  /* Remember the block position, if we got one */
458  if (thisBlkPos >= 0)
459  {
460  TocEntry *otherte = getTocEntryByDumpId(AH, id);
461 
462  if (otherte && otherte->formatData)
463  {
464  lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
465 
466  /*
467  * Note: on Windows, multiple threads might access/update
468  * the same lclTocEntry concurrently, but that should be
469  * safe as long as we update dataPos before dataState.
470  * Ideally, we'd use pg_write_barrier() to enforce that,
471  * but the needed infrastructure doesn't exist in frontend
472  * code. But Windows only runs on machines with strong
473  * store ordering, so it should be okay for now.
474  */
475  if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
476  {
477  othertctx->dataPos = thisBlkPos;
478  othertctx->dataState = K_OFFSET_POS_SET;
479  }
480  else if (othertctx->dataPos != thisBlkPos ||
481  othertctx->dataState != K_OFFSET_POS_SET)
482  {
483  /* sanity check */
484  pg_log_warning("data block %d has wrong seek position",
485  id);
486  }
487  }
488  }
489 
490  switch (blkType)
491  {
492  case BLK_DATA:
493  _skipData(AH);
494  break;
495 
496  case BLK_BLOBS:
497  _skipLOs(AH);
498  break;
499 
500  default: /* Always have a default */
501  pg_fatal("unrecognized data block type (%d) while searching archive",
502  blkType);
503  break;
504  }
505  }
506  }
507  else
508  {
509  /* We can just seek to the place we need to be. */
510  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
511  pg_fatal("error during file seek: %m");
512 
513  _readBlockHeader(AH, &blkType, &id);
514  }
515 
516  /*
517  * If we reached EOF without finding the block we want, then either it
518  * doesn't exist, or it does but we lack the ability to seek back to it.
519  */
520  if (blkType == EOF)
521  {
522  if (!ctx->hasSeek)
523  pg_fatal("could not find block ID %d in archive -- "
524  "possibly due to out-of-order restore request, "
525  "which cannot be handled due to non-seekable input file",
526  te->dumpId);
527  else
528  pg_fatal("could not find block ID %d in archive -- "
529  "possibly corrupt archive",
530  te->dumpId);
531  }
532 
533  /* Are we sane? */
534  if (id != te->dumpId)
535  pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
536  id, te->dumpId);
537 
538  switch (blkType)
539  {
540  case BLK_DATA:
541  _PrintData(AH);
542  break;
543 
544  case BLK_BLOBS:
545  _LoadLOs(AH, AH->public.ropt->dropSchema);
546  break;
547 
548  default: /* Always have a default */
549  pg_fatal("unrecognized data block type %d while restoring archive",
550  blkType);
551  break;
552  }
553 
554  /*
555  * If our input file is seekable but lacks data offsets, update our
556  * knowledge of where to start future searches from. (Note that we did
557  * not update the current TE's dataState/dataPos. We could have, but
558  * there is no point since it will not be visited again.)
559  */
560  if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
561  {
562  pgoff_t curPos = _getFilePos(AH, ctx);
563 
564  if (curPos > ctx->lastFilePos)
565  ctx->lastFilePos = curPos;
566  }
567 }
568 
569 /*
570  * Print data from current file position.
571 */
572 static void
574 {
575  CompressorState *cs;
576 
578  _CustomReadFunc, NULL);
579  cs->readData(AH, cs);
580  EndCompressor(AH, cs);
581 }
582 
583 static void
584 _LoadLOs(ArchiveHandle *AH, bool drop)
585 {
586  Oid oid;
587 
588  StartRestoreLOs(AH);
589 
590  oid = ReadInt(AH);
591  while (oid != 0)
592  {
593  StartRestoreLO(AH, oid, drop);
594  _PrintData(AH);
595  EndRestoreLO(AH, oid);
596  oid = ReadInt(AH);
597  }
598 
599  EndRestoreLOs(AH);
600 }
601 
602 /*
603  * Skip the LOs from the current file position.
604  * LOs are written sequentially as data blocks (see below).
605  * Each LO is preceded by its original OID.
606  * A zero OID indicates the end of the LOs.
607  */
608 static void
610 {
611  Oid oid;
612 
613  oid = ReadInt(AH);
614  while (oid != 0)
615  {
616  _skipData(AH);
617  oid = ReadInt(AH);
618  }
619 }
620 
621 /*
622  * Skip data from current file position.
623  * Data blocks are formatted as an integer length, followed by data.
624  * A zero length indicates the end of the block.
625 */
626 static void
628 {
629  lclContext *ctx = (lclContext *) AH->formatData;
630  size_t blkLen;
631  char *buf = NULL;
632  int buflen = 0;
633 
634  blkLen = ReadInt(AH);
635  while (blkLen != 0)
636  {
637  if (ctx->hasSeek)
638  {
639  if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
640  pg_fatal("error during file seek: %m");
641  }
642  else
643  {
644  if (blkLen > buflen)
645  {
646  free(buf);
647  buf = (char *) pg_malloc(blkLen);
648  buflen = blkLen;
649  }
650  if (fread(buf, 1, blkLen, AH->FH) != blkLen)
651  {
652  if (feof(AH->FH))
653  pg_fatal("could not read from input file: end of file");
654  else
655  pg_fatal("could not read from input file: %m");
656  }
657  }
658 
659  blkLen = ReadInt(AH);
660  }
661 
662  free(buf);
663 }
664 
665 /*
666  * Write a byte of data to the archive.
667  *
668  * Mandatory.
669  *
670  * Called by the archiver to do integer & byte output to the archive.
671  */
672 static int
673 _WriteByte(ArchiveHandle *AH, const int i)
674 {
675  if (fputc(i, AH->FH) == EOF)
677 
678  return 1;
679 }
680 
681 /*
682  * Read a byte of data from the archive.
683  *
684  * Mandatory
685  *
686  * Called by the archiver to read bytes & integers from the archive.
687  * EOF should be treated as a fatal error.
688  */
689 static int
691 {
692  int res;
693 
694  res = getc(AH->FH);
695  if (res == EOF)
696  READ_ERROR_EXIT(AH->FH);
697  return res;
698 }
699 
700 /*
701  * Write a buffer of data to the archive.
702  *
703  * Mandatory.
704  *
705  * Called by the archiver to write a block of bytes to the archive.
706  */
707 static void
708 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
709 {
710  if (fwrite(buf, 1, len, AH->FH) != len)
712 }
713 
714 /*
715  * Read a block of bytes from the archive.
716  *
717  * Mandatory.
718  *
719  * Called by the archiver to read a block of bytes from the archive
720  */
721 static void
722 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
723 {
724  if (fread(buf, 1, len, AH->FH) != len)
725  READ_ERROR_EXIT(AH->FH);
726 }
727 
728 /*
729  * Close the archive.
730  *
731  * Mandatory.
732  *
733  * When writing the archive, this is the routine that actually starts
734  * the process of saving it to files. No data should be written prior
735  * to this point, since the user could sort the TOC after creating it.
736  *
737  * If an archive is to be written, this routine must call:
738  * WriteHead to save the archive header
739  * WriteToc to save the TOC entries
740  * WriteDataChunks to save all data & LOs.
741  *
742  */
743 static void
745 {
746  lclContext *ctx = (lclContext *) AH->formatData;
747  pgoff_t tpos;
748 
749  if (AH->mode == archModeWrite)
750  {
751  WriteHead(AH);
752  /* Remember TOC's seek position for use below */
753  tpos = ftello(AH->FH);
754  if (tpos < 0 && ctx->hasSeek)
755  pg_fatal("could not determine seek position in archive file: %m");
756  WriteToc(AH);
757  WriteDataChunks(AH, NULL);
758 
759  /*
760  * If possible, re-write the TOC in order to update the data offset
761  * information. This is not essential, as pg_restore can cope in most
762  * cases without it; but it can make pg_restore significantly faster
763  * in some situations (especially parallel restore).
764  */
765  if (ctx->hasSeek &&
766  fseeko(AH->FH, tpos, SEEK_SET) == 0)
767  WriteToc(AH);
768  }
769 
770  if (fclose(AH->FH) != 0)
771  pg_fatal("could not close archive file: %m");
772 
773  /* Sync the output file if one is defined */
774  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
775  (void) fsync_fname(AH->fSpec, false);
776 
777  AH->FH = NULL;
778 }
779 
780 /*
781  * Reopen the archive's file handle.
782  *
783  * We close the original file handle, except on Windows. (The difference
784  * is because on Windows, this is used within a multithreading context,
785  * and we don't want a thread closing the parent file handle.)
786  */
787 static void
789 {
790  lclContext *ctx = (lclContext *) AH->formatData;
791  pgoff_t tpos;
792 
793  if (AH->mode == archModeWrite)
794  pg_fatal("can only reopen input archives");
795 
796  /*
797  * These two cases are user-facing errors since they represent unsupported
798  * (but not invalid) use-cases. Word the error messages appropriately.
799  */
800  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
801  pg_fatal("parallel restore from standard input is not supported");
802  if (!ctx->hasSeek)
803  pg_fatal("parallel restore from non-seekable file is not supported");
804 
805  tpos = ftello(AH->FH);
806  if (tpos < 0)
807  pg_fatal("could not determine seek position in archive file: %m");
808 
809 #ifndef WIN32
810  if (fclose(AH->FH) != 0)
811  pg_fatal("could not close archive file: %m");
812 #endif
813 
814  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
815  if (!AH->FH)
816  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
817 
818  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
819  pg_fatal("could not set seek position in archive file: %m");
820 }
821 
822 /*
823  * Prepare for parallel restore.
824  *
825  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
826  * TOC entries' dataLength fields with appropriate values to guide the
827  * ordering of restore jobs. The source of said data is format-dependent,
828  * as is the exact meaning of the values.
829  *
830  * A format module might also choose to do other setup here.
831  */
832 static void
834 {
835  lclContext *ctx = (lclContext *) AH->formatData;
836  TocEntry *prev_te = NULL;
837  lclTocEntry *prev_tctx = NULL;
838  TocEntry *te;
839 
840  /*
841  * Knowing that the data items were dumped out in TOC order, we can
842  * reconstruct the length of each item as the delta to the start offset of
843  * the next data item.
844  */
845  for (te = AH->toc->next; te != AH->toc; te = te->next)
846  {
847  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
848 
849  /*
850  * Ignore entries without a known data offset; if we were unable to
851  * seek to rewrite the TOC when creating the archive, this'll be all
852  * of them, and we'll end up with no size estimates.
853  */
854  if (tctx->dataState != K_OFFSET_POS_SET)
855  continue;
856 
857  /* Compute previous data item's length */
858  if (prev_te)
859  {
860  if (tctx->dataPos > prev_tctx->dataPos)
861  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
862  }
863 
864  prev_te = te;
865  prev_tctx = tctx;
866  }
867 
868  /* If OK to seek, we can determine the length of the last item */
869  if (prev_te && ctx->hasSeek)
870  {
871  pgoff_t endpos;
872 
873  if (fseeko(AH->FH, 0, SEEK_END) != 0)
874  pg_fatal("error during file seek: %m");
875  endpos = ftello(AH->FH);
876  if (endpos > prev_tctx->dataPos)
877  prev_te->dataLength = endpos - prev_tctx->dataPos;
878  }
879 }
880 
881 /*
882  * Clone format-specific fields during parallel restoration.
883  */
884 static void
886 {
887  lclContext *ctx = (lclContext *) AH->formatData;
888 
889  /*
890  * Each thread must have private lclContext working state.
891  */
892  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
893  memcpy(AH->formatData, ctx, sizeof(lclContext));
894  ctx = (lclContext *) AH->formatData;
895 
896  /* sanity check, shouldn't happen */
897  if (ctx->cs != NULL)
898  pg_fatal("compressor active");
899 
900  /*
901  * We intentionally do not clone TOC-entry-local state: it's useful to
902  * share knowledge about where the data blocks are across threads.
903  * _PrintTocData has to be careful about the order of operations on that
904  * state, though.
905  *
906  * Note: we do not make a local lo_buf because we expect at most one BLOBS
907  * entry per archive, so no parallelism is possible.
908  */
909 }
910 
911 static void
913 {
914  lclContext *ctx = (lclContext *) AH->formatData;
915 
916  free(ctx);
917 }
918 
919 /*
920  * This function is executed in the child of a parallel restore from a
921  * custom-format archive and restores the actual data for one TOC entry.
922  */
923 static int
925 {
926  return parallel_restore(AH, te);
927 }
928 
929 /*--------------------------------------------------
930  * END OF FORMAT CALLBACKS
931  *--------------------------------------------------
932  */
933 
934 /*
935  * Get the current position in the archive file.
936  *
937  * With a non-seekable archive file, we may not be able to obtain the
938  * file position. If so, just return -1. It's not too important in
939  * that case because we won't be able to rewrite the TOC to fill in
940  * data block offsets anyway.
941  */
942 static pgoff_t
944 {
945  pgoff_t pos;
946 
947  pos = ftello(AH->FH);
948  if (pos < 0)
949  {
950  /* Not expected if we found we can seek. */
951  if (ctx->hasSeek)
952  pg_fatal("could not determine seek position in archive file: %m");
953  }
954  return pos;
955 }
956 
957 /*
958  * Read a data block header. The format changed in V1.3, so we
959  * centralize the code here for simplicity. Returns *type = EOF
960  * if at EOF.
961  */
962 static void
964 {
965  int byt;
966 
967  /*
968  * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
969  * inside ReadInt rather than returning EOF. It doesn't seem worth
970  * jumping through hoops to deal with that case better, because no such
971  * files are likely to exist in the wild: only some 7.1 development
972  * versions of pg_dump ever generated such files.
973  */
974  if (AH->version < K_VERS_1_3)
975  *type = BLK_DATA;
976  else
977  {
978  byt = getc(AH->FH);
979  *type = byt;
980  if (byt == EOF)
981  {
982  *id = 0; /* don't return an uninitialized value */
983  return;
984  }
985  }
986 
987  *id = ReadInt(AH);
988 }
989 
990 /*
991  * Callback function for writeData. Writes one block of (compressed)
992  * data to the archive.
993  */
994 static void
995 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
996 {
997  /* never write 0-byte blocks (this should not happen) */
998  if (len > 0)
999  {
1000  WriteInt(AH, len);
1001  _WriteBuf(AH, buf, len);
1002  }
1003 }
1004 
1005 /*
1006  * Callback function for readData. To keep things simple, we
1007  * always read one compressed block at a time.
1008  */
1009 static size_t
1010 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1011 {
1012  size_t blkLen;
1013 
1014  /* Read length */
1015  blkLen = ReadInt(AH);
1016  if (blkLen == 0)
1017  return 0;
1018 
1019  /* If the caller's buffer is not large enough, allocate a bigger one */
1020  if (blkLen > *buflen)
1021  {
1022  free(*buf);
1023  *buf = (char *) pg_malloc(blkLen);
1024  *buflen = blkLen;
1025  }
1026 
1027  /* exits app on read errors */
1028  _ReadBuf(AH, *buf, blkLen);
1029 
1030  return blkLen;
1031 }
#define PG_BINARY_R
Definition: c.h:1262
#define INT64_FORMAT
Definition: c.h:535
#define PG_BINARY_W
Definition: c.h:1263
CompressorState * AllocateCompressor(const pg_compress_specification compression_spec, ReadFunc readF, WriteFunc writeF)
Definition: compress_io.c:124
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:149
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ archModeWrite
Definition: pg_backup.h:51
bool checkSeek(FILE *fp)
void WriteHead(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
size_t WriteInt(ArchiveHandle *AH, int i)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
void StartRestoreLOs(ArchiveHandle *AH)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
int ReadInt(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void ReadToc(ArchiveHandle *AH)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
void WriteToc(ArchiveHandle *AH)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
#define LOBBUFSIZE
#define K_OFFSET_NO_DATA
#define WRITE_ERROR_EXIT
#define BLK_DATA
#define BLK_BLOBS
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define READ_ERROR_EXIT(fd)
#define K_VERS_1_3
#define K_VERS_1_7
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _skipData(ArchiveHandle *AH)
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
static void _skipLOs(ArchiveHandle *AH)
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _DeClone(ArchiveHandle *AH)
static void _ReopenArchive(ArchiveHandle *AH)
static void _StartLOs(ArchiveHandle *AH, TocEntry *te)
void InitArchiveFmt_Custom(ArchiveHandle *AH)
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
static void _EndLOs(ArchiveHandle *AH, TocEntry *te)
static int _WriteByte(ArchiveHandle *AH, const int i)
static void _PrepParallelRestore(ArchiveHandle *AH)
static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static int _ReadByte(ArchiveHandle *AH)
static void _PrintData(ArchiveHandle *AH)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
static void _Clone(ArchiveHandle *AH)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
static void _LoadLOs(ArchiveHandle *AH, bool drop)
#define pg_fatal(...)
const void size_t len
const void * data
static XLogRecPtr endpos
Definition: pg_receivewal.c:56
static char * buf
Definition: pg_test_fsync.c:73
#define pg_log_warning(...)
Definition: pgfnames.c:24
unsigned int Oid
Definition: postgres_ext.h:31
int verbose
Definition: pg_backup.h:215
RestoreOptions * ropt
Definition: pg_backup.h:213
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
ReadExtraTocPtrType ReadExtraTocPtr
WorkerJobDumpPtrType WorkerJobDumpPtr
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
WorkerJobRestorePtrType WorkerJobRestorePtr
PrintTocDataPtrType PrintTocDataPtr
WriteBytePtrType WriteBytePtr
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
EndDataPtrType EndDataPtr
ClosePtrType ClosePtr
DataDumperPtr dataDumper
struct _tocEntry * next
pgoff_t lastFilePos
CompressorState * cs
const char * type
#define ftello(stream)
Definition: win32_port.h:219
#define fseeko(stream, offset, origin)
Definition: win32_port.h:216
#define pgoff_t
Definition: win32_port.h:207