PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *AH);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipLOs(ArchiveHandle *AH);
56 
57 static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadLOs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
68 
69 typedef struct
70 {
72  int hasSeek;
73  /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74  pgoff_t lastFilePos; /* position after last data block we've read */
75 } lclContext;
76 
77 typedef struct
78 {
79  int dataState;
80  pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  * Init routine required by ALL formats. This is a global routine
97  * and should be declared in pg_backup_archiver.h
98  *
99  * It's task is to create any extra archive context (using AH->formatData),
100  * and to initialize the supported function pointers.
101  *
102  * It should also prepare whatever its input source is for reading/writing,
103  * and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
107 {
108  lclContext *ctx;
109 
110  /* Assuming static functions, this can be copied for each format. */
112  AH->StartDataPtr = _StartData;
113  AH->WriteDataPtr = _WriteData;
114  AH->EndDataPtr = _EndData;
115  AH->WriteBytePtr = _WriteByte;
116  AH->ReadBytePtr = _ReadByte;
117  AH->WriteBufPtr = _WriteBuf;
118  AH->ReadBufPtr = _ReadBuf;
119  AH->ClosePtr = _CloseArchive;
125 
126  AH->StartLOsPtr = _StartLOs;
127  AH->StartLOPtr = _StartLO;
128  AH->EndLOPtr = _EndLO;
129  AH->EndLOsPtr = _EndLOs;
130 
132  AH->ClonePtr = _Clone;
133  AH->DeClonePtr = _DeClone;
134 
135  /* no parallel dump in the custom archive, only parallel restore */
136  AH->WorkerJobDumpPtr = NULL;
138 
139  /* Set up a private area. */
140  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141  AH->formatData = (void *) ctx;
142 
143  /*
144  * Now open the file
145  */
146  if (AH->mode == archModeWrite)
147  {
148  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
149  {
150  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
151  if (!AH->FH)
152  pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
153  }
154  else
155  {
156  AH->FH = stdout;
157  if (!AH->FH)
158  pg_fatal("could not open output file: %m");
159  }
160 
161  ctx->hasSeek = checkSeek(AH->FH);
162  }
163  else
164  {
165  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
166  {
167  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
168  if (!AH->FH)
169  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
170  }
171  else
172  {
173  AH->FH = stdin;
174  if (!AH->FH)
175  pg_fatal("could not open input file: %m");
176  }
177 
178  ctx->hasSeek = checkSeek(AH->FH);
179 
180  ReadHead(AH);
181  ReadToc(AH);
182 
183  /*
184  * Remember location of first data block (i.e., the point after TOC)
185  * in case we have to search for desired data blocks.
186  */
187  ctx->lastFilePos = _getFilePos(AH, ctx);
188  }
189 }
190 
191 /*
192  * Called by the Archiver when the dumper creates a new TOC entry.
193  *
194  * Optional.
195  *
196  * Set up extract format-related TOC data.
197 */
198 static void
200 {
201  lclTocEntry *ctx;
202 
203  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
204  if (te->dataDumper)
206  else
208 
209  te->formatData = (void *) ctx;
210 }
211 
212 /*
213  * Called by the Archiver to save any extra format-related TOC entry
214  * data.
215  *
216  * Optional.
217  *
218  * Use the Archiver routines to write data - they are non-endian, and
219  * maintain other important file information.
220  */
221 static void
223 {
224  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
225 
226  WriteOffset(AH, ctx->dataPos, ctx->dataState);
227 }
228 
229 /*
230  * Called by the Archiver to read any extra format-related TOC data.
231  *
232  * Optional.
233  *
234  * Needs to match the order defined in _WriteExtraToc, and should also
235  * use the Archiver input routines.
236  */
237 static void
239 {
240  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
241 
242  if (ctx == NULL)
243  {
244  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
245  te->formatData = (void *) ctx;
246  }
247 
248  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
249 
250  /*
251  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
252  * dump it at all.
253  */
254  if (AH->version < K_VERS_1_7)
255  ReadInt(AH);
256 }
257 
258 /*
259  * Called by the Archiver when restoring an archive to output a comment
260  * that includes useful information about the TOC entry.
261  *
262  * Optional.
263  */
264 static void
266 {
267  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
268 
269  if (AH->public.verbose)
270  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
271  (int64) ctx->dataPos);
272 }
273 
274 /*
275  * Called by the archiver when saving TABLE DATA (not schema). This routine
276  * should save whatever format-specific information is needed to read
277  * the archive back.
278  *
279  * It is called just prior to the dumper's 'DataDumper' routine being called.
280  *
281  * Optional, but strongly recommended.
282  *
283  */
284 static void
286 {
287  lclContext *ctx = (lclContext *) AH->formatData;
288  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
289 
290  tctx->dataPos = _getFilePos(AH, ctx);
291  if (tctx->dataPos >= 0)
292  tctx->dataState = K_OFFSET_POS_SET;
293 
294  _WriteByte(AH, BLK_DATA); /* Block type */
295  WriteInt(AH, te->dumpId); /* For sanity check */
296 
298  NULL,
300 }
301 
302 /*
303  * Called by archiver when dumper calls WriteData. This routine is
304  * called for both LO and table data; it is the responsibility of
305  * the format to manage each kind of data using StartLO/StartData.
306  *
307  * It should only be called from within a DataDumper routine.
308  *
309  * Mandatory.
310  */
311 static void
312 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
313 {
314  lclContext *ctx = (lclContext *) AH->formatData;
315  CompressorState *cs = ctx->cs;
316 
317  if (dLen > 0)
318  /* writeData() internally throws write errors */
319  cs->writeData(AH, cs, data, dLen);
320 }
321 
322 /*
323  * Called by the archiver when a dumper's 'DataDumper' routine has
324  * finished.
325  *
326  * Mandatory.
327  */
328 static void
330 {
331  lclContext *ctx = (lclContext *) AH->formatData;
332 
333  EndCompressor(AH, ctx->cs);
334  ctx->cs = NULL;
335 
336  /* Send the end marker */
337  WriteInt(AH, 0);
338 }
339 
340 /*
341  * Called by the archiver when starting to save BLOB DATA (not schema).
342  * This routine should save whatever format-specific information is needed
343  * to read the LOs back into memory.
344  *
345  * It is called just prior to the dumper's DataDumper routine.
346  *
347  * Optional, but strongly recommended.
348  */
349 static void
351 {
352  lclContext *ctx = (lclContext *) AH->formatData;
353  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
354 
355  tctx->dataPos = _getFilePos(AH, ctx);
356  if (tctx->dataPos >= 0)
357  tctx->dataState = K_OFFSET_POS_SET;
358 
359  _WriteByte(AH, BLK_BLOBS); /* Block type */
360  WriteInt(AH, te->dumpId); /* For sanity check */
361 }
362 
363 /*
364  * Called by the archiver when the dumper calls StartLO.
365  *
366  * Mandatory.
367  *
368  * Must save the passed OID for retrieval at restore-time.
369  */
370 static void
372 {
373  lclContext *ctx = (lclContext *) AH->formatData;
374 
375  if (oid == 0)
376  pg_fatal("invalid OID for large object");
377 
378  WriteInt(AH, oid);
379 
381  NULL,
383 }
384 
385 /*
386  * Called by the archiver when the dumper calls EndLO.
387  *
388  * Optional.
389  */
390 static void
392 {
393  lclContext *ctx = (lclContext *) AH->formatData;
394 
395  EndCompressor(AH, ctx->cs);
396  /* Send the end marker */
397  WriteInt(AH, 0);
398 }
399 
400 /*
401  * Called by the archiver when finishing saving BLOB DATA.
402  *
403  * Optional.
404  */
405 static void
407 {
408  /* Write out a fake zero OID to mark end-of-LOs. */
409  WriteInt(AH, 0);
410 }
411 
412 /*
413  * Print data for a given TOC entry
414  */
415 static void
417 {
418  lclContext *ctx = (lclContext *) AH->formatData;
419  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
420  int blkType;
421  int id;
422 
423  if (tctx->dataState == K_OFFSET_NO_DATA)
424  return;
425 
426  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
427  {
428  /*
429  * We cannot seek directly to the desired block. Instead, skip over
430  * block headers until we find the one we want. Remember the
431  * positions of skipped-over blocks, so that if we later decide we
432  * need to read one, we'll be able to seek to it.
433  *
434  * When our input file is seekable, we can do the search starting from
435  * the point after the last data block we scanned in previous
436  * iterations of this function.
437  */
438  if (ctx->hasSeek)
439  {
440  if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
441  pg_fatal("error during file seek: %m");
442  }
443 
444  for (;;)
445  {
446  pgoff_t thisBlkPos = _getFilePos(AH, ctx);
447 
448  _readBlockHeader(AH, &blkType, &id);
449 
450  if (blkType == EOF || id == te->dumpId)
451  break;
452 
453  /* Remember the block position, if we got one */
454  if (thisBlkPos >= 0)
455  {
456  TocEntry *otherte = getTocEntryByDumpId(AH, id);
457 
458  if (otherte && otherte->formatData)
459  {
460  lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
461 
462  /*
463  * Note: on Windows, multiple threads might access/update
464  * the same lclTocEntry concurrently, but that should be
465  * safe as long as we update dataPos before dataState.
466  * Ideally, we'd use pg_write_barrier() to enforce that,
467  * but the needed infrastructure doesn't exist in frontend
468  * code. But Windows only runs on machines with strong
469  * store ordering, so it should be okay for now.
470  */
471  if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
472  {
473  othertctx->dataPos = thisBlkPos;
474  othertctx->dataState = K_OFFSET_POS_SET;
475  }
476  else if (othertctx->dataPos != thisBlkPos ||
477  othertctx->dataState != K_OFFSET_POS_SET)
478  {
479  /* sanity check */
480  pg_log_warning("data block %d has wrong seek position",
481  id);
482  }
483  }
484  }
485 
486  switch (blkType)
487  {
488  case BLK_DATA:
489  _skipData(AH);
490  break;
491 
492  case BLK_BLOBS:
493  _skipLOs(AH);
494  break;
495 
496  default: /* Always have a default */
497  pg_fatal("unrecognized data block type (%d) while searching archive",
498  blkType);
499  break;
500  }
501  }
502  }
503  else
504  {
505  /* We can just seek to the place we need to be. */
506  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
507  pg_fatal("error during file seek: %m");
508 
509  _readBlockHeader(AH, &blkType, &id);
510  }
511 
512  /*
513  * If we reached EOF without finding the block we want, then either it
514  * doesn't exist, or it does but we lack the ability to seek back to it.
515  */
516  if (blkType == EOF)
517  {
518  if (!ctx->hasSeek)
519  pg_fatal("could not find block ID %d in archive -- "
520  "possibly due to out-of-order restore request, "
521  "which cannot be handled due to non-seekable input file",
522  te->dumpId);
523  else
524  pg_fatal("could not find block ID %d in archive -- "
525  "possibly corrupt archive",
526  te->dumpId);
527  }
528 
529  /* Are we sane? */
530  if (id != te->dumpId)
531  pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
532  id, te->dumpId);
533 
534  switch (blkType)
535  {
536  case BLK_DATA:
537  _PrintData(AH);
538  break;
539 
540  case BLK_BLOBS:
541  _LoadLOs(AH, AH->public.ropt->dropSchema);
542  break;
543 
544  default: /* Always have a default */
545  pg_fatal("unrecognized data block type %d while restoring archive",
546  blkType);
547  break;
548  }
549 
550  /*
551  * If our input file is seekable but lacks data offsets, update our
552  * knowledge of where to start future searches from. (Note that we did
553  * not update the current TE's dataState/dataPos. We could have, but
554  * there is no point since it will not be visited again.)
555  */
556  if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
557  {
558  pgoff_t curPos = _getFilePos(AH, ctx);
559 
560  if (curPos > ctx->lastFilePos)
561  ctx->lastFilePos = curPos;
562  }
563 }
564 
565 /*
566  * Print data from current file position.
567 */
568 static void
570 {
571  CompressorState *cs;
572 
574  _CustomReadFunc, NULL);
575  cs->readData(AH, cs);
576  EndCompressor(AH, cs);
577 }
578 
579 static void
580 _LoadLOs(ArchiveHandle *AH, bool drop)
581 {
582  Oid oid;
583 
584  StartRestoreLOs(AH);
585 
586  oid = ReadInt(AH);
587  while (oid != 0)
588  {
589  StartRestoreLO(AH, oid, drop);
590  _PrintData(AH);
591  EndRestoreLO(AH, oid);
592  oid = ReadInt(AH);
593  }
594 
595  EndRestoreLOs(AH);
596 }
597 
598 /*
599  * Skip the LOs from the current file position.
600  * LOs are written sequentially as data blocks (see below).
601  * Each LO is preceded by its original OID.
602  * A zero OID indicates the end of the LOs.
603  */
604 static void
606 {
607  Oid oid;
608 
609  oid = ReadInt(AH);
610  while (oid != 0)
611  {
612  _skipData(AH);
613  oid = ReadInt(AH);
614  }
615 }
616 
617 /*
618  * Skip data from current file position.
619  * Data blocks are formatted as an integer length, followed by data.
620  * A zero length indicates the end of the block.
621 */
622 static void
624 {
625  lclContext *ctx = (lclContext *) AH->formatData;
626  size_t blkLen;
627  char *buf = NULL;
628  int buflen = 0;
629 
630  blkLen = ReadInt(AH);
631  while (blkLen != 0)
632  {
633  if (ctx->hasSeek)
634  {
635  if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
636  pg_fatal("error during file seek: %m");
637  }
638  else
639  {
640  if (blkLen > buflen)
641  {
642  free(buf);
643  buf = (char *) pg_malloc(blkLen);
644  buflen = blkLen;
645  }
646  if (fread(buf, 1, blkLen, AH->FH) != blkLen)
647  {
648  if (feof(AH->FH))
649  pg_fatal("could not read from input file: end of file");
650  else
651  pg_fatal("could not read from input file: %m");
652  }
653  }
654 
655  blkLen = ReadInt(AH);
656  }
657 
658  free(buf);
659 }
660 
661 /*
662  * Write a byte of data to the archive.
663  *
664  * Mandatory.
665  *
666  * Called by the archiver to do integer & byte output to the archive.
667  */
668 static int
669 _WriteByte(ArchiveHandle *AH, const int i)
670 {
671  if (fputc(i, AH->FH) == EOF)
673 
674  return 1;
675 }
676 
677 /*
678  * Read a byte of data from the archive.
679  *
680  * Mandatory
681  *
682  * Called by the archiver to read bytes & integers from the archive.
683  * EOF should be treated as a fatal error.
684  */
685 static int
687 {
688  int res;
689 
690  res = getc(AH->FH);
691  if (res == EOF)
692  READ_ERROR_EXIT(AH->FH);
693  return res;
694 }
695 
696 /*
697  * Write a buffer of data to the archive.
698  *
699  * Mandatory.
700  *
701  * Called by the archiver to write a block of bytes to the archive.
702  */
703 static void
704 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
705 {
706  if (fwrite(buf, 1, len, AH->FH) != len)
708 }
709 
710 /*
711  * Read a block of bytes from the archive.
712  *
713  * Mandatory.
714  *
715  * Called by the archiver to read a block of bytes from the archive
716  */
717 static void
718 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
719 {
720  if (fread(buf, 1, len, AH->FH) != len)
721  READ_ERROR_EXIT(AH->FH);
722 }
723 
724 /*
725  * Close the archive.
726  *
727  * Mandatory.
728  *
729  * When writing the archive, this is the routine that actually starts
730  * the process of saving it to files. No data should be written prior
731  * to this point, since the user could sort the TOC after creating it.
732  *
733  * If an archive is to be written, this routine must call:
734  * WriteHead to save the archive header
735  * WriteToc to save the TOC entries
736  * WriteDataChunks to save all data & LOs.
737  *
738  */
739 static void
741 {
742  lclContext *ctx = (lclContext *) AH->formatData;
743  pgoff_t tpos;
744 
745  if (AH->mode == archModeWrite)
746  {
747  WriteHead(AH);
748  /* Remember TOC's seek position for use below */
749  tpos = ftello(AH->FH);
750  if (tpos < 0 && ctx->hasSeek)
751  pg_fatal("could not determine seek position in archive file: %m");
752  WriteToc(AH);
753  WriteDataChunks(AH, NULL);
754 
755  /*
756  * If possible, re-write the TOC in order to update the data offset
757  * information. This is not essential, as pg_restore can cope in most
758  * cases without it; but it can make pg_restore significantly faster
759  * in some situations (especially parallel restore).
760  */
761  if (ctx->hasSeek &&
762  fseeko(AH->FH, tpos, SEEK_SET) == 0)
763  WriteToc(AH);
764  }
765 
766  if (fclose(AH->FH) != 0)
767  pg_fatal("could not close archive file: %m");
768 
769  /* Sync the output file if one is defined */
770  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
771  (void) fsync_fname(AH->fSpec, false);
772 
773  AH->FH = NULL;
774 }
775 
776 /*
777  * Reopen the archive's file handle.
778  *
779  * We close the original file handle, except on Windows. (The difference
780  * is because on Windows, this is used within a multithreading context,
781  * and we don't want a thread closing the parent file handle.)
782  */
783 static void
785 {
786  lclContext *ctx = (lclContext *) AH->formatData;
787  pgoff_t tpos;
788 
789  if (AH->mode == archModeWrite)
790  pg_fatal("can only reopen input archives");
791 
792  /*
793  * These two cases are user-facing errors since they represent unsupported
794  * (but not invalid) use-cases. Word the error messages appropriately.
795  */
796  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
797  pg_fatal("parallel restore from standard input is not supported");
798  if (!ctx->hasSeek)
799  pg_fatal("parallel restore from non-seekable file is not supported");
800 
801  tpos = ftello(AH->FH);
802  if (tpos < 0)
803  pg_fatal("could not determine seek position in archive file: %m");
804 
805 #ifndef WIN32
806  if (fclose(AH->FH) != 0)
807  pg_fatal("could not close archive file: %m");
808 #endif
809 
810  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
811  if (!AH->FH)
812  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
813 
814  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
815  pg_fatal("could not set seek position in archive file: %m");
816 }
817 
818 /*
819  * Prepare for parallel restore.
820  *
821  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
822  * TOC entries' dataLength fields with appropriate values to guide the
823  * ordering of restore jobs. The source of said data is format-dependent,
824  * as is the exact meaning of the values.
825  *
826  * A format module might also choose to do other setup here.
827  */
828 static void
830 {
831  lclContext *ctx = (lclContext *) AH->formatData;
832  TocEntry *prev_te = NULL;
833  lclTocEntry *prev_tctx = NULL;
834  TocEntry *te;
835 
836  /*
837  * Knowing that the data items were dumped out in TOC order, we can
838  * reconstruct the length of each item as the delta to the start offset of
839  * the next data item.
840  */
841  for (te = AH->toc->next; te != AH->toc; te = te->next)
842  {
843  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
844 
845  /*
846  * Ignore entries without a known data offset; if we were unable to
847  * seek to rewrite the TOC when creating the archive, this'll be all
848  * of them, and we'll end up with no size estimates.
849  */
850  if (tctx->dataState != K_OFFSET_POS_SET)
851  continue;
852 
853  /* Compute previous data item's length */
854  if (prev_te)
855  {
856  if (tctx->dataPos > prev_tctx->dataPos)
857  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
858  }
859 
860  prev_te = te;
861  prev_tctx = tctx;
862  }
863 
864  /* If OK to seek, we can determine the length of the last item */
865  if (prev_te && ctx->hasSeek)
866  {
867  pgoff_t endpos;
868 
869  if (fseeko(AH->FH, 0, SEEK_END) != 0)
870  pg_fatal("error during file seek: %m");
871  endpos = ftello(AH->FH);
872  if (endpos > prev_tctx->dataPos)
873  prev_te->dataLength = endpos - prev_tctx->dataPos;
874  }
875 }
876 
877 /*
878  * Clone format-specific fields during parallel restoration.
879  */
880 static void
882 {
883  lclContext *ctx = (lclContext *) AH->formatData;
884 
885  /*
886  * Each thread must have private lclContext working state.
887  */
888  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
889  memcpy(AH->formatData, ctx, sizeof(lclContext));
890  ctx = (lclContext *) AH->formatData;
891 
892  /* sanity check, shouldn't happen */
893  if (ctx->cs != NULL)
894  pg_fatal("compressor active");
895 
896  /*
897  * We intentionally do not clone TOC-entry-local state: it's useful to
898  * share knowledge about where the data blocks are across threads.
899  * _PrintTocData has to be careful about the order of operations on that
900  * state, though.
901  */
902 }
903 
904 static void
906 {
907  lclContext *ctx = (lclContext *) AH->formatData;
908 
909  free(ctx);
910 }
911 
912 /*
913  * This function is executed in the child of a parallel restore from a
914  * custom-format archive and restores the actual data for one TOC entry.
915  */
916 static int
918 {
919  return parallel_restore(AH, te);
920 }
921 
922 /*--------------------------------------------------
923  * END OF FORMAT CALLBACKS
924  *--------------------------------------------------
925  */
926 
927 /*
928  * Get the current position in the archive file.
929  *
930  * With a non-seekable archive file, we may not be able to obtain the
931  * file position. If so, just return -1. It's not too important in
932  * that case because we won't be able to rewrite the TOC to fill in
933  * data block offsets anyway.
934  */
935 static pgoff_t
937 {
938  pgoff_t pos;
939 
940  pos = ftello(AH->FH);
941  if (pos < 0)
942  {
943  /* Not expected if we found we can seek. */
944  if (ctx->hasSeek)
945  pg_fatal("could not determine seek position in archive file: %m");
946  }
947  return pos;
948 }
949 
950 /*
951  * Read a data block header. The format changed in V1.3, so we
952  * centralize the code here for simplicity. Returns *type = EOF
953  * if at EOF.
954  */
955 static void
957 {
958  int byt;
959 
960  /*
961  * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
962  * inside ReadInt rather than returning EOF. It doesn't seem worth
963  * jumping through hoops to deal with that case better, because no such
964  * files are likely to exist in the wild: only some 7.1 development
965  * versions of pg_dump ever generated such files.
966  */
967  if (AH->version < K_VERS_1_3)
968  *type = BLK_DATA;
969  else
970  {
971  byt = getc(AH->FH);
972  *type = byt;
973  if (byt == EOF)
974  {
975  *id = 0; /* don't return an uninitialized value */
976  return;
977  }
978  }
979 
980  *id = ReadInt(AH);
981 }
982 
983 /*
984  * Callback function for writeData. Writes one block of (compressed)
985  * data to the archive.
986  */
987 static void
988 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
989 {
990  /* never write 0-byte blocks (this should not happen) */
991  if (len > 0)
992  {
993  WriteInt(AH, len);
994  _WriteBuf(AH, buf, len);
995  }
996 }
997 
998 /*
999  * Callback function for readData. To keep things simple, we
1000  * always read one compressed block at a time.
1001  */
1002 static size_t
1003 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1004 {
1005  size_t blkLen;
1006 
1007  /* Read length */
1008  blkLen = ReadInt(AH);
1009  if (blkLen == 0)
1010  return 0;
1011 
1012  /* If the caller's buffer is not large enough, allocate a bigger one */
1013  if (blkLen > *buflen)
1014  {
1015  free(*buf);
1016  *buf = (char *) pg_malloc(blkLen);
1017  *buflen = blkLen;
1018  }
1019 
1020  /* exits app on read errors */
1021  _ReadBuf(AH, *buf, blkLen);
1022 
1023  return blkLen;
1024 }
#define PG_BINARY_R
Definition: c.h:1275
#define INT64_FORMAT
Definition: c.h:548
#define PG_BINARY_W
Definition: c.h:1276
CompressorState * AllocateCompressor(const pg_compress_specification compression_spec, ReadFunc readF, WriteFunc writeF)
Definition: compress_io.c:124
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:149
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
for(;;)
#define free(a)
Definition: header.h:65
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ archModeWrite
Definition: pg_backup.h:51
bool checkSeek(FILE *fp)
void WriteHead(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
size_t WriteInt(ArchiveHandle *AH, int i)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
void StartRestoreLOs(ArchiveHandle *AH)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
int ReadInt(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void ReadToc(ArchiveHandle *AH)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
void WriteToc(ArchiveHandle *AH)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
#define K_OFFSET_NO_DATA
#define WRITE_ERROR_EXIT
#define BLK_DATA
#define BLK_BLOBS
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define READ_ERROR_EXIT(fd)
#define K_VERS_1_3
#define K_VERS_1_7
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _skipData(ArchiveHandle *AH)
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
static void _skipLOs(ArchiveHandle *AH)
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _DeClone(ArchiveHandle *AH)
static void _ReopenArchive(ArchiveHandle *AH)
static void _StartLOs(ArchiveHandle *AH, TocEntry *te)
void InitArchiveFmt_Custom(ArchiveHandle *AH)
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
static void _EndLOs(ArchiveHandle *AH, TocEntry *te)
static int _WriteByte(ArchiveHandle *AH, const int i)
static void _PrepParallelRestore(ArchiveHandle *AH)
static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static int _ReadByte(ArchiveHandle *AH)
static void _PrintData(ArchiveHandle *AH)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
static void _Clone(ArchiveHandle *AH)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
static void _LoadLOs(ArchiveHandle *AH, bool drop)
#define pg_fatal(...)
const void size_t len
const void * data
static XLogRecPtr endpos
Definition: pg_receivewal.c:56
static char * buf
Definition: pg_test_fsync.c:73
#define pg_log_warning(...)
Definition: pgfnames.c:24
unsigned int Oid
Definition: postgres_ext.h:31
int verbose
Definition: pg_backup.h:217
RestoreOptions * ropt
Definition: pg_backup.h:215
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
ReadExtraTocPtrType ReadExtraTocPtr
WorkerJobDumpPtrType WorkerJobDumpPtr
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
WorkerJobRestorePtrType WorkerJobRestorePtr
PrintTocDataPtrType PrintTocDataPtr
WriteBytePtrType WriteBytePtr
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
EndDataPtrType EndDataPtr
ClosePtrType ClosePtr
DataDumperPtr dataDumper
struct _tocEntry * next
pgoff_t lastFilePos
CompressorState * cs
const char * type
#define ftello(stream)
Definition: win32_port.h:219
#define fseeko(stream, offset, origin)
Definition: win32_port.h:216
#define pgoff_t
Definition: win32_port.h:207