PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "compress_io.h"
29 #include "parallel.h"
30 #include "pg_backup_utils.h"
31 #include "common/file_utils.h"
32 
33 
34 /*--------
35  * Routines in the format interface
36  *--------
37  */
38 
39 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
40 static void _StartData(ArchiveHandle *AH, TocEntry *te);
41 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
42 static void _EndData(ArchiveHandle *AH, TocEntry *te);
43 static int _WriteByte(ArchiveHandle *AH, const int i);
44 static int _ReadByte(ArchiveHandle *);
45 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
46 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
47 static void _CloseArchive(ArchiveHandle *AH);
48 static void _ReopenArchive(ArchiveHandle *AH);
49 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
50 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
52 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
53 
54 static void _PrintData(ArchiveHandle *AH);
55 static void _skipData(ArchiveHandle *AH);
56 static void _skipBlobs(ArchiveHandle *AH);
57 
58 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
59 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
61 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
62 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
63 
64 static void _PrepParallelRestore(ArchiveHandle *AH);
65 static void _Clone(ArchiveHandle *AH);
66 static void _DeClone(ArchiveHandle *AH);
67 
69 
70 typedef struct
71 {
73  int hasSeek;
76 } lclContext;
77 
78 typedef struct
79 {
80  int dataState;
82 } lclTocEntry;
83 
84 
85 /*------
86  * Static declarations
87  *------
88  */
89 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
91 
92 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
93 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
94 
95 
96 /*
97  * Init routine required by ALL formats. This is a global routine
98  * and should be declared in pg_backup_archiver.h
99  *
100  * It's task is to create any extra archive context (using AH->formatData),
101  * and to initialize the supported function pointers.
102  *
103  * It should also prepare whatever it's input source is for reading/writing,
104  * and in the case of a read mode connection, it should load the Header & TOC.
105  */
106 void
108 {
109  lclContext *ctx;
110 
111  /* Assuming static functions, this can be copied for each format. */
113  AH->StartDataPtr = _StartData;
114  AH->WriteDataPtr = _WriteData;
115  AH->EndDataPtr = _EndData;
116  AH->WriteBytePtr = _WriteByte;
117  AH->ReadBytePtr = _ReadByte;
118  AH->WriteBufPtr = _WriteBuf;
119  AH->ReadBufPtr = _ReadBuf;
120  AH->ClosePtr = _CloseArchive;
126 
128  AH->StartBlobPtr = _StartBlob;
129  AH->EndBlobPtr = _EndBlob;
130  AH->EndBlobsPtr = _EndBlobs;
131 
133  AH->ClonePtr = _Clone;
134  AH->DeClonePtr = _DeClone;
135 
136  /* no parallel dump in the custom archive, only parallel restore */
137  AH->WorkerJobDumpPtr = NULL;
139 
140  /* Set up a private area. */
141  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
142  AH->formatData = (void *) ctx;
143 
144  /* Initialize LO buffering */
145  AH->lo_buf_size = LOBBUFSIZE;
146  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
147 
148  ctx->filePos = 0;
149 
150  /*
151  * Now open the file
152  */
153  if (AH->mode == archModeWrite)
154  {
155  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
156  {
157  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
158  if (!AH->FH)
159  fatal("could not open output file \"%s\": %m", AH->fSpec);
160  }
161  else
162  {
163  AH->FH = stdout;
164  if (!AH->FH)
165  fatal("could not open output file: %m");
166  }
167 
168  ctx->hasSeek = checkSeek(AH->FH);
169  }
170  else
171  {
172  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
173  {
174  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
175  if (!AH->FH)
176  fatal("could not open input file \"%s\": %m", AH->fSpec);
177  }
178  else
179  {
180  AH->FH = stdin;
181  if (!AH->FH)
182  fatal("could not open input file: %m");
183  }
184 
185  ctx->hasSeek = checkSeek(AH->FH);
186 
187  ReadHead(AH);
188  ReadToc(AH);
189  ctx->dataStart = _getFilePos(AH, ctx);
190  }
191 
192 }
193 
194 /*
195  * Called by the Archiver when the dumper creates a new TOC entry.
196  *
197  * Optional.
198  *
199  * Set up extract format-related TOC data.
200 */
201 static void
203 {
204  lclTocEntry *ctx;
205 
206  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
207  if (te->dataDumper)
209  else
211 
212  te->formatData = (void *) ctx;
213 }
214 
215 /*
216  * Called by the Archiver to save any extra format-related TOC entry
217  * data.
218  *
219  * Optional.
220  *
221  * Use the Archiver routines to write data - they are non-endian, and
222  * maintain other important file information.
223  */
224 static void
226 {
227  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
228 
229  WriteOffset(AH, ctx->dataPos, ctx->dataState);
230 }
231 
232 /*
233  * Called by the Archiver to read any extra format-related TOC data.
234  *
235  * Optional.
236  *
237  * Needs to match the order defined in _WriteExtraToc, and should also
238  * use the Archiver input routines.
239  */
240 static void
242 {
243  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
244 
245  if (ctx == NULL)
246  {
247  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
248  te->formatData = (void *) ctx;
249  }
250 
251  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
252 
253  /*
254  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
255  * dump it at all.
256  */
257  if (AH->version < K_VERS_1_7)
258  ReadInt(AH);
259 }
260 
261 /*
262  * Called by the Archiver when restoring an archive to output a comment
263  * that includes useful information about the TOC entry.
264  *
265  * Optional.
266  *
267  */
268 static void
270 {
271  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272 
273  if (AH->public.verbose)
274  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275  (int64) ctx->dataPos);
276 }
277 
278 /*
279  * Called by the archiver when saving TABLE DATA (not schema). This routine
280  * should save whatever format-specific information is needed to read
281  * the archive back.
282  *
283  * It is called just prior to the dumper's 'DataDumper' routine being called.
284  *
285  * Optional, but strongly recommended.
286  *
287  */
288 static void
290 {
291  lclContext *ctx = (lclContext *) AH->formatData;
292  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 
294  tctx->dataPos = _getFilePos(AH, ctx);
295  tctx->dataState = K_OFFSET_POS_SET;
296 
297  _WriteByte(AH, BLK_DATA); /* Block type */
298  WriteInt(AH, te->dumpId); /* For sanity check */
299 
301 }
302 
303 /*
304  * Called by archiver when dumper calls WriteData. This routine is
305  * called for both BLOB and TABLE data; it is the responsibility of
306  * the format to manage each kind of data using StartBlob/StartData.
307  *
308  * It should only be called from within a DataDumper routine.
309  *
310  * Mandatory.
311  */
312 static void
313 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
314 {
315  lclContext *ctx = (lclContext *) AH->formatData;
316  CompressorState *cs = ctx->cs;
317 
318  if (dLen > 0)
319  /* WriteDataToArchive() internally throws write errors */
320  WriteDataToArchive(AH, cs, data, dLen);
321 
322  return;
323 }
324 
325 /*
326  * Called by the archiver when a dumper's 'DataDumper' routine has
327  * finished.
328  *
329  * Optional.
330  *
331  */
332 static void
334 {
335  lclContext *ctx = (lclContext *) AH->formatData;
336 
337  EndCompressor(AH, ctx->cs);
338  /* Send the end marker */
339  WriteInt(AH, 0);
340 }
341 
342 /*
343  * Called by the archiver when starting to save all BLOB DATA (not schema).
344  * This routine should save whatever format-specific information is needed
345  * to read the BLOBs back into memory.
346  *
347  * It is called just prior to the dumper's DataDumper routine.
348  *
349  * Optional, but strongly recommended.
350  */
351 static void
353 {
354  lclContext *ctx = (lclContext *) AH->formatData;
355  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
356 
357  tctx->dataPos = _getFilePos(AH, ctx);
358  tctx->dataState = K_OFFSET_POS_SET;
359 
360  _WriteByte(AH, BLK_BLOBS); /* Block type */
361  WriteInt(AH, te->dumpId); /* For sanity check */
362 }
363 
364 /*
365  * Called by the archiver when the dumper calls StartBlob.
366  *
367  * Mandatory.
368  *
369  * Must save the passed OID for retrieval at restore-time.
370  */
371 static void
373 {
374  lclContext *ctx = (lclContext *) AH->formatData;
375 
376  if (oid == 0)
377  fatal("invalid OID for large object");
378 
379  WriteInt(AH, oid);
380 
382 }
383 
384 /*
385  * Called by the archiver when the dumper calls EndBlob.
386  *
387  * Optional.
388  */
389 static void
391 {
392  lclContext *ctx = (lclContext *) AH->formatData;
393 
394  EndCompressor(AH, ctx->cs);
395  /* Send the end marker */
396  WriteInt(AH, 0);
397 }
398 
399 /*
400  * Called by the archiver when finishing saving all BLOB DATA.
401  *
402  * Optional.
403  */
404 static void
406 {
407  /* Write out a fake zero OID to mark end-of-blobs. */
408  WriteInt(AH, 0);
409 }
410 
411 /*
412  * Print data for a given TOC entry
413  */
414 static void
416 {
417  lclContext *ctx = (lclContext *) AH->formatData;
418  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419  int blkType;
420  int id;
421 
422  if (tctx->dataState == K_OFFSET_NO_DATA)
423  return;
424 
425  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426  {
427  /*
428  * We cannot seek directly to the desired block. Instead, skip over
429  * block headers until we find the one we want. This could fail if we
430  * are asked to restore items out-of-order.
431  */
432  _readBlockHeader(AH, &blkType, &id);
433 
434  while (blkType != EOF && id != te->dumpId)
435  {
436  switch (blkType)
437  {
438  case BLK_DATA:
439  _skipData(AH);
440  break;
441 
442  case BLK_BLOBS:
443  _skipBlobs(AH);
444  break;
445 
446  default: /* Always have a default */
447  fatal("unrecognized data block type (%d) while searching archive",
448  blkType);
449  break;
450  }
451  _readBlockHeader(AH, &blkType, &id);
452  }
453  }
454  else
455  {
456  /* We can just seek to the place we need to be. */
457  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
458  fatal("error during file seek: %m");
459 
460  _readBlockHeader(AH, &blkType, &id);
461  }
462 
463  /* Produce suitable failure message if we fell off end of file */
464  if (blkType == EOF)
465  {
466  if (tctx->dataState == K_OFFSET_POS_NOT_SET)
467  fatal("could not find block ID %d in archive -- "
468  "possibly due to out-of-order restore request, "
469  "which cannot be handled due to lack of data offsets in archive",
470  te->dumpId);
471  else if (!ctx->hasSeek)
472  fatal("could not find block ID %d in archive -- "
473  "possibly due to out-of-order restore request, "
474  "which cannot be handled due to non-seekable input file",
475  te->dumpId);
476  else /* huh, the dataPos led us to EOF? */
477  fatal("could not find block ID %d in archive -- "
478  "possibly corrupt archive",
479  te->dumpId);
480  }
481 
482  /* Are we sane? */
483  if (id != te->dumpId)
484  fatal("found unexpected block ID (%d) when reading data -- expected %d",
485  id, te->dumpId);
486 
487  switch (blkType)
488  {
489  case BLK_DATA:
490  _PrintData(AH);
491  break;
492 
493  case BLK_BLOBS:
494  _LoadBlobs(AH, AH->public.ropt->dropSchema);
495  break;
496 
497  default: /* Always have a default */
498  fatal("unrecognized data block type %d while restoring archive",
499  blkType);
500  break;
501  }
502 }
503 
504 /*
505  * Print data from current file position.
506 */
507 static void
509 {
511 }
512 
513 static void
514 _LoadBlobs(ArchiveHandle *AH, bool drop)
515 {
516  Oid oid;
517 
518  StartRestoreBlobs(AH);
519 
520  oid = ReadInt(AH);
521  while (oid != 0)
522  {
523  StartRestoreBlob(AH, oid, drop);
524  _PrintData(AH);
525  EndRestoreBlob(AH, oid);
526  oid = ReadInt(AH);
527  }
528 
529  EndRestoreBlobs(AH);
530 }
531 
532 /*
533  * Skip the BLOBs from the current file position.
534  * BLOBS are written sequentially as data blocks (see below).
535  * Each BLOB is preceded by it's original OID.
536  * A zero OID indicated the end of the BLOBS
537  */
538 static void
540 {
541  Oid oid;
542 
543  oid = ReadInt(AH);
544  while (oid != 0)
545  {
546  _skipData(AH);
547  oid = ReadInt(AH);
548  }
549 }
550 
551 /*
552  * Skip data from current file position.
553  * Data blocks are formatted as an integer length, followed by data.
554  * A zero length denoted the end of the block.
555 */
556 static void
558 {
559  lclContext *ctx = (lclContext *) AH->formatData;
560  size_t blkLen;
561  char *buf = NULL;
562  int buflen = 0;
563  size_t cnt;
564 
565  blkLen = ReadInt(AH);
566  while (blkLen != 0)
567  {
568  if (blkLen > buflen)
569  {
570  if (buf)
571  free(buf);
572  buf = (char *) pg_malloc(blkLen);
573  buflen = blkLen;
574  }
575  if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
576  {
577  if (feof(AH->FH))
578  fatal("could not read from input file: end of file");
579  else
580  fatal("could not read from input file: %m");
581  }
582 
583  ctx->filePos += blkLen;
584 
585  blkLen = ReadInt(AH);
586  }
587 
588  if (buf)
589  free(buf);
590 }
591 
592 /*
593  * Write a byte of data to the archive.
594  *
595  * Mandatory.
596  *
597  * Called by the archiver to do integer & byte output to the archive.
598  */
599 static int
600 _WriteByte(ArchiveHandle *AH, const int i)
601 {
602  lclContext *ctx = (lclContext *) AH->formatData;
603  int res;
604 
605  if ((res = fputc(i, AH->FH)) == EOF)
607  ctx->filePos += 1;
608 
609  return 1;
610 }
611 
612 /*
613  * Read a byte of data from the archive.
614  *
615  * Mandatory
616  *
617  * Called by the archiver to read bytes & integers from the archive.
618  * EOF should be treated as a fatal error.
619  */
620 static int
622 {
623  lclContext *ctx = (lclContext *) AH->formatData;
624  int res;
625 
626  res = getc(AH->FH);
627  if (res == EOF)
628  READ_ERROR_EXIT(AH->FH);
629  ctx->filePos += 1;
630  return res;
631 }
632 
633 /*
634  * Write a buffer of data to the archive.
635  *
636  * Mandatory.
637  *
638  * Called by the archiver to write a block of bytes to the archive.
639  */
640 static void
641 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
642 {
643  lclContext *ctx = (lclContext *) AH->formatData;
644 
645  if (fwrite(buf, 1, len, AH->FH) != len)
647  ctx->filePos += len;
648 
649  return;
650 }
651 
652 /*
653  * Read a block of bytes from the archive.
654  *
655  * Mandatory.
656  *
657  * Called by the archiver to read a block of bytes from the archive
658  */
659 static void
660 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
661 {
662  lclContext *ctx = (lclContext *) AH->formatData;
663 
664  if (fread(buf, 1, len, AH->FH) != len)
665  READ_ERROR_EXIT(AH->FH);
666  ctx->filePos += len;
667 
668  return;
669 }
670 
671 /*
672  * Close the archive.
673  *
674  * Mandatory.
675  *
676  * When writing the archive, this is the routine that actually starts
677  * the process of saving it to files. No data should be written prior
678  * to this point, since the user could sort the TOC after creating it.
679  *
680  * If an archive is to be written, this routine must call:
681  * WriteHead to save the archive header
682  * WriteToc to save the TOC entries
683  * WriteDataChunks to save all DATA & BLOBs.
684  *
685  */
686 static void
688 {
689  lclContext *ctx = (lclContext *) AH->formatData;
690  pgoff_t tpos;
691 
692  if (AH->mode == archModeWrite)
693  {
694  WriteHead(AH);
695  /* Remember TOC's seek position for use below */
696  tpos = ftello(AH->FH);
697  if (tpos < 0 && ctx->hasSeek)
698  fatal("could not determine seek position in archive file: %m");
699  WriteToc(AH);
700  ctx->dataStart = _getFilePos(AH, ctx);
701  WriteDataChunks(AH, NULL);
702 
703  /*
704  * If possible, re-write the TOC in order to update the data offset
705  * information. This is not essential, as pg_restore can cope in most
706  * cases without it; but it can make pg_restore significantly faster
707  * in some situations (especially parallel restore).
708  */
709  if (ctx->hasSeek &&
710  fseeko(AH->FH, tpos, SEEK_SET) == 0)
711  WriteToc(AH);
712  }
713 
714  if (fclose(AH->FH) != 0)
715  fatal("could not close archive file: %m");
716 
717  /* Sync the output file if one is defined */
718  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
719  (void) fsync_fname(AH->fSpec, false);
720 
721  AH->FH = NULL;
722 }
723 
724 /*
725  * Reopen the archive's file handle.
726  *
727  * We close the original file handle, except on Windows. (The difference
728  * is because on Windows, this is used within a multithreading context,
729  * and we don't want a thread closing the parent file handle.)
730  */
731 static void
733 {
734  lclContext *ctx = (lclContext *) AH->formatData;
735  pgoff_t tpos;
736 
737  if (AH->mode == archModeWrite)
738  fatal("can only reopen input archives");
739 
740  /*
741  * These two cases are user-facing errors since they represent unsupported
742  * (but not invalid) use-cases. Word the error messages appropriately.
743  */
744  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
745  fatal("parallel restore from standard input is not supported");
746  if (!ctx->hasSeek)
747  fatal("parallel restore from non-seekable file is not supported");
748 
749  tpos = ftello(AH->FH);
750  if (tpos < 0)
751  fatal("could not determine seek position in archive file: %m");
752 
753 #ifndef WIN32
754  if (fclose(AH->FH) != 0)
755  fatal("could not close archive file: %m");
756 #endif
757 
758  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
759  if (!AH->FH)
760  fatal("could not open input file \"%s\": %m", AH->fSpec);
761 
762  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
763  fatal("could not set seek position in archive file: %m");
764 }
765 
766 /*
767  * Prepare for parallel restore.
768  *
769  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
770  * TOC entries' dataLength fields with appropriate values to guide the
771  * ordering of restore jobs. The source of said data is format-dependent,
772  * as is the exact meaning of the values.
773  *
774  * A format module might also choose to do other setup here.
775  */
776 static void
778 {
779  lclContext *ctx = (lclContext *) AH->formatData;
780  TocEntry *prev_te = NULL;
781  lclTocEntry *prev_tctx = NULL;
782  TocEntry *te;
783 
784  /*
785  * Knowing that the data items were dumped out in TOC order, we can
786  * reconstruct the length of each item as the delta to the start offset of
787  * the next data item.
788  */
789  for (te = AH->toc->next; te != AH->toc; te = te->next)
790  {
791  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
792 
793  /*
794  * Ignore entries without a known data offset; if we were unable to
795  * seek to rewrite the TOC when creating the archive, this'll be all
796  * of them, and we'll end up with no size estimates.
797  */
798  if (tctx->dataState != K_OFFSET_POS_SET)
799  continue;
800 
801  /* Compute previous data item's length */
802  if (prev_te)
803  {
804  if (tctx->dataPos > prev_tctx->dataPos)
805  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
806  }
807 
808  prev_te = te;
809  prev_tctx = tctx;
810  }
811 
812  /* If OK to seek, we can determine the length of the last item */
813  if (prev_te && ctx->hasSeek)
814  {
815  pgoff_t endpos;
816 
817  if (fseeko(AH->FH, 0, SEEK_END) != 0)
818  fatal("error during file seek: %m");
819  endpos = ftello(AH->FH);
820  if (endpos > prev_tctx->dataPos)
821  prev_te->dataLength = endpos - prev_tctx->dataPos;
822  }
823 }
824 
825 /*
826  * Clone format-specific fields during parallel restoration.
827  */
828 static void
830 {
831  lclContext *ctx = (lclContext *) AH->formatData;
832 
833  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
834  memcpy(AH->formatData, ctx, sizeof(lclContext));
835  ctx = (lclContext *) AH->formatData;
836 
837  /* sanity check, shouldn't happen */
838  if (ctx->cs != NULL)
839  fatal("compressor active");
840 
841  /*
842  * Note: we do not make a local lo_buf because we expect at most one BLOBS
843  * entry per archive, so no parallelism is possible. Likewise,
844  * TOC-entry-local state isn't an issue because any one TOC entry is
845  * touched by just one worker child.
846  */
847 }
848 
849 static void
851 {
852  lclContext *ctx = (lclContext *) AH->formatData;
853 
854  free(ctx);
855 }
856 
857 /*
858  * This function is executed in the child of a parallel restore from a
859  * custom-format archive and restores the actual data for one TOC entry.
860  */
861 static int
863 {
864  return parallel_restore(AH, te);
865 }
866 
867 /*--------------------------------------------------
868  * END OF FORMAT CALLBACKS
869  *--------------------------------------------------
870  */
871 
872 /*
873  * Get the current position in the archive file.
874  */
875 static pgoff_t
877 {
878  pgoff_t pos;
879 
880  if (ctx->hasSeek)
881  {
882  /*
883  * Prior to 1.7 (pg7.3) we relied on the internally maintained
884  * pointer. Now we rely on ftello() always, unless the file has been
885  * found to not support it. For debugging purposes, print a warning
886  * if the internal pointer disagrees, so that we're more likely to
887  * notice if something's broken about the internal position tracking.
888  */
889  pos = ftello(AH->FH);
890  if (pos < 0)
891  fatal("could not determine seek position in archive file: %m");
892 
893  if (pos != ctx->filePos)
894  pg_log_warning("ftell mismatch with expected position -- ftell used");
895  }
896  else
897  pos = ctx->filePos;
898  return pos;
899 }
900 
901 /*
902  * Read a data block header. The format changed in V1.3, so we
903  * centralize the code here for simplicity. Returns *type = EOF
904  * if at EOF.
905  */
906 static void
908 {
909  lclContext *ctx = (lclContext *) AH->formatData;
910  int byt;
911 
912  /*
913  * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
914  * ReadInt rather than returning EOF. It doesn't seem worth jumping
915  * through hoops to deal with that case better, because no such files are
916  * likely to exist in the wild: only some 7.1 development versions of
917  * pg_dump ever generated such files.
918  */
919  if (AH->version < K_VERS_1_3)
920  *type = BLK_DATA;
921  else
922  {
923  byt = getc(AH->FH);
924  *type = byt;
925  if (byt == EOF)
926  {
927  *id = 0; /* don't return an uninitialized value */
928  return;
929  }
930  ctx->filePos += 1;
931  }
932 
933  *id = ReadInt(AH);
934 }
935 
936 /*
937  * Callback function for WriteDataToArchive. Writes one block of (compressed)
938  * data to the archive.
939  */
940 static void
941 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
942 {
943  /* never write 0-byte blocks (this should not happen) */
944  if (len > 0)
945  {
946  WriteInt(AH, len);
947  _WriteBuf(AH, buf, len);
948  }
949  return;
950 }
951 
952 /*
953  * Callback function for ReadDataFromArchive. To keep things simple, we
954  * always read one compressed block at a time.
955  */
956 static size_t
957 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
958 {
959  size_t blkLen;
960 
961  /* Read length */
962  blkLen = ReadInt(AH);
963  if (blkLen == 0)
964  return 0;
965 
966  /* If the caller's buffer is not large enough, allocate a bigger one */
967  if (blkLen > *buflen)
968  {
969  free(*buf);
970  *buf = (char *) pg_malloc(blkLen);
971  *buflen = blkLen;
972  }
973 
974  /* exits app on read errors */
975  _ReadBuf(AH, *buf, blkLen);
976 
977  return blkLen;
978 }
struct _tocEntry * next
static void _ReopenArchive(ArchiveHandle *AH)
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
ReopenPtrType ReopenPtr
ReadBufPtrType ReadBufPtr
void ReadToc(ArchiveHandle *AH)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
void ReadHead(ArchiveHandle *AH)
DeClonePtrType DeClonePtr
static void _Clone(ArchiveHandle *AH)
#define BLK_DATA
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
RestoreOptions * ropt
Definition: pg_backup.h:183
WriteBufPtrType WriteBufPtr
#define BLK_BLOBS
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:580
StartBlobPtrType StartBlobPtr
static void _skipBlobs(ArchiveHandle *AH)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
EndDataPtrType EndDataPtr
#define PG_BINARY_W
Definition: c.h:1194
void WriteToc(ArchiveHandle *AH)
DataDumperPtr dataDumper
int ReadInt(ArchiveHandle *AH)
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
StartBlobsPtrType StartBlobsPtr
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY_R
Definition: c.h:1193
void StartRestoreBlobs(ArchiveHandle *AH)
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
ReadBytePtrType ReadBytePtr
static void _PrintData(ArchiveHandle *AH)
CompressorState * cs
#define K_VERS_1_7
#define pgoff_t
Definition: win32_port.h:195
#define ftello(stream)
Definition: win32_port.h:204
EndBlobPtrType EndBlobPtr
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static void _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static XLogRecPtr endpos
Definition: pg_receivewal.c:48
void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
struct _tocEntry * toc
static char * buf
Definition: pg_test_fsync.c:68
StartDataPtrType StartDataPtr
static void _LoadBlobs(ArchiveHandle *AH, bool drop)
#define K_OFFSET_NO_DATA
WriteBytePtrType WriteBytePtr
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_3
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static void _skipData(ArchiveHandle *AH)
static void _PrepParallelRestore(ArchiveHandle *AH)
void EndRestoreBlobs(ArchiveHandle *AH)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static int _ReadByte(ArchiveHandle *)
int verbose
Definition: pg_backup.h:185
PrintTocDataPtrType PrintTocDataPtr
#define fseeko(stream, offset, origin)
Definition: win32_port.h:201
WriteDataPtrType WriteDataPtr
#define free(a)
Definition: header.h:65
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te)
PrepParallelRestorePtrType PrepParallelRestorePtr
EndBlobsPtrType EndBlobsPtr
size_t WriteInt(ArchiveHandle *AH, int i)
ArchiveEntryPtrType ArchiveEntryPtr
#define fatal(...)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
#define INT64_FORMAT
Definition: c.h:400
void WriteHead(ArchiveHandle *AH)
#define WRITE_ERROR_EXIT
pgoff_t dataStart
ClonePtrType ClonePtr
void EndRestoreBlob(ArchiveHandle *AH, Oid oid)
#define K_OFFSET_POS_SET
int i
WorkerJobRestorePtrType WorkerJobRestorePtr
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define READ_ERROR_EXIT(fd)
static void _DeClone(ArchiveHandle *AH)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
bool checkSeek(FILE *fp)
ClosePtrType ClosePtr
PrintExtraTocPtrType PrintExtraTocPtr
#define pg_log_warning(...)
Definition: pgfnames.c:24
pgoff_t filePos
static int _WriteByte(ArchiveHandle *AH, const int i)
WriteExtraTocPtrType WriteExtraTocPtr
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:202
#define LOBBUFSIZE
ReadExtraTocPtrType ReadExtraTocPtr
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_OFFSET_POS_NOT_SET
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
WorkerJobDumpPtrType WorkerJobDumpPtr