PostgreSQL Source Code  git master
pg_backup_custom.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_custom.c
4  *
5  * Implements the custom output format.
6  *
7  * The comments with the routines in this code are a good place to
8  * understand how to write a new format.
9  *
10  * See the headers to pg_restore for more details.
11  *
12  * Copyright (c) 2000, Philip Warner
13  * Rights are granted to use this software in any way so long
14  * as this notice is not removed.
15  *
16  * The author is not responsible for loss or damages that may
17  * and any liability will be limited to the time taken to fix any
18  * related bug.
19  *
20  *
21  * IDENTIFICATION
22  * src/bin/pg_dump/pg_backup_custom.c
23  *
24  *-------------------------------------------------------------------------
25  */
26 #include "postgres_fe.h"
27 
28 #include "common/file_utils.h"
29 #include "compress_io.h"
30 #include "parallel.h"
31 #include "pg_backup_utils.h"
32 
33 /*--------
34  * Routines in the format interface
35  *--------
36  */
37 
38 static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 static int _WriteByte(ArchiveHandle *AH, const int i);
43 static int _ReadByte(ArchiveHandle *);
44 static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 static void _CloseArchive(ArchiveHandle *AH);
47 static void _ReopenArchive(ArchiveHandle *AH);
48 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 
53 static void _PrintData(ArchiveHandle *AH);
54 static void _skipData(ArchiveHandle *AH);
55 static void _skipBlobs(ArchiveHandle *AH);
56 
57 static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
58 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
61 static void _LoadBlobs(ArchiveHandle *AH, bool drop);
62 
63 static void _PrepParallelRestore(ArchiveHandle *AH);
64 static void _Clone(ArchiveHandle *AH);
65 static void _DeClone(ArchiveHandle *AH);
66 
68 
69 typedef struct
70 {
72  int hasSeek;
75 } lclContext;
76 
77 typedef struct
78 {
79  int dataState;
81 } lclTocEntry;
82 
83 
84 /*------
85  * Static declarations
86  *------
87  */
88 static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
90 
91 static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 
94 
95 /*
96  * Init routine required by ALL formats. This is a global routine
97  * and should be declared in pg_backup_archiver.h
98  *
99  * It's task is to create any extra archive context (using AH->formatData),
100  * and to initialize the supported function pointers.
101  *
102  * It should also prepare whatever it's input source is for reading/writing,
103  * and in the case of a read mode connection, it should load the Header & TOC.
104  */
105 void
107 {
108  lclContext *ctx;
109 
110  /* Assuming static functions, this can be copied for each format. */
112  AH->StartDataPtr = _StartData;
113  AH->WriteDataPtr = _WriteData;
114  AH->EndDataPtr = _EndData;
115  AH->WriteBytePtr = _WriteByte;
116  AH->ReadBytePtr = _ReadByte;
117  AH->WriteBufPtr = _WriteBuf;
118  AH->ReadBufPtr = _ReadBuf;
119  AH->ClosePtr = _CloseArchive;
125 
127  AH->StartBlobPtr = _StartBlob;
128  AH->EndBlobPtr = _EndBlob;
129  AH->EndBlobsPtr = _EndBlobs;
130 
132  AH->ClonePtr = _Clone;
133  AH->DeClonePtr = _DeClone;
134 
135  /* no parallel dump in the custom archive, only parallel restore */
136  AH->WorkerJobDumpPtr = NULL;
138 
139  /* Set up a private area. */
140  ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141  AH->formatData = (void *) ctx;
142 
143  /* Initialize LO buffering */
144  AH->lo_buf_size = LOBBUFSIZE;
145  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 
147  ctx->filePos = 0;
148 
149  /*
150  * Now open the file
151  */
152  if (AH->mode == archModeWrite)
153  {
154  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
155  {
156  AH->FH = fopen(AH->fSpec, PG_BINARY_W);
157  if (!AH->FH)
158  fatal("could not open output file \"%s\": %m", AH->fSpec);
159  }
160  else
161  {
162  AH->FH = stdout;
163  if (!AH->FH)
164  fatal("could not open output file: %m");
165  }
166 
167  ctx->hasSeek = checkSeek(AH->FH);
168  }
169  else
170  {
171  if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
172  {
173  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
174  if (!AH->FH)
175  fatal("could not open input file \"%s\": %m", AH->fSpec);
176  }
177  else
178  {
179  AH->FH = stdin;
180  if (!AH->FH)
181  fatal("could not open input file: %m");
182  }
183 
184  ctx->hasSeek = checkSeek(AH->FH);
185 
186  ReadHead(AH);
187  ReadToc(AH);
188  ctx->dataStart = _getFilePos(AH, ctx);
189  }
190 
191 }
192 
193 /*
194  * Called by the Archiver when the dumper creates a new TOC entry.
195  *
196  * Optional.
197  *
198  * Set up extract format-related TOC data.
199 */
200 static void
202 {
203  lclTocEntry *ctx;
204 
205  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
206  if (te->dataDumper)
208  else
210 
211  te->formatData = (void *) ctx;
212 }
213 
214 /*
215  * Called by the Archiver to save any extra format-related TOC entry
216  * data.
217  *
218  * Optional.
219  *
220  * Use the Archiver routines to write data - they are non-endian, and
221  * maintain other important file information.
222  */
223 static void
225 {
226  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
227 
228  WriteOffset(AH, ctx->dataPos, ctx->dataState);
229 }
230 
231 /*
232  * Called by the Archiver to read any extra format-related TOC data.
233  *
234  * Optional.
235  *
236  * Needs to match the order defined in _WriteExtraToc, and should also
237  * use the Archiver input routines.
238  */
239 static void
241 {
242  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
243 
244  if (ctx == NULL)
245  {
246  ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
247  te->formatData = (void *) ctx;
248  }
249 
250  ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
251 
252  /*
253  * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
254  * dump it at all.
255  */
256  if (AH->version < K_VERS_1_7)
257  ReadInt(AH);
258 }
259 
260 /*
261  * Called by the Archiver when restoring an archive to output a comment
262  * that includes useful information about the TOC entry.
263  *
264  * Optional.
265  *
266  */
267 static void
269 {
270  lclTocEntry *ctx = (lclTocEntry *) te->formatData;
271 
272  if (AH->public.verbose)
273  ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
274  (int64) ctx->dataPos);
275 }
276 
277 /*
278  * Called by the archiver when saving TABLE DATA (not schema). This routine
279  * should save whatever format-specific information is needed to read
280  * the archive back.
281  *
282  * It is called just prior to the dumper's 'DataDumper' routine being called.
283  *
284  * Optional, but strongly recommended.
285  *
286  */
287 static void
289 {
290  lclContext *ctx = (lclContext *) AH->formatData;
291  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
292 
293  tctx->dataPos = _getFilePos(AH, ctx);
294  tctx->dataState = K_OFFSET_POS_SET;
295 
296  _WriteByte(AH, BLK_DATA); /* Block type */
297  WriteInt(AH, te->dumpId); /* For sanity check */
298 
300 }
301 
302 /*
303  * Called by archiver when dumper calls WriteData. This routine is
304  * called for both BLOB and TABLE data; it is the responsibility of
305  * the format to manage each kind of data using StartBlob/StartData.
306  *
307  * It should only be called from within a DataDumper routine.
308  *
309  * Mandatory.
310  */
311 static void
312 _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
313 {
314  lclContext *ctx = (lclContext *) AH->formatData;
315  CompressorState *cs = ctx->cs;
316 
317  if (dLen > 0)
318  /* WriteDataToArchive() internally throws write errors */
319  WriteDataToArchive(AH, cs, data, dLen);
320 }
321 
322 /*
323  * Called by the archiver when a dumper's 'DataDumper' routine has
324  * finished.
325  *
326  * Optional.
327  *
328  */
329 static void
331 {
332  lclContext *ctx = (lclContext *) AH->formatData;
333 
334  EndCompressor(AH, ctx->cs);
335  /* Send the end marker */
336  WriteInt(AH, 0);
337 }
338 
339 /*
340  * Called by the archiver when starting to save all BLOB DATA (not schema).
341  * This routine should save whatever format-specific information is needed
342  * to read the BLOBs back into memory.
343  *
344  * It is called just prior to the dumper's DataDumper routine.
345  *
346  * Optional, but strongly recommended.
347  */
348 static void
350 {
351  lclContext *ctx = (lclContext *) AH->formatData;
352  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
353 
354  tctx->dataPos = _getFilePos(AH, ctx);
355  tctx->dataState = K_OFFSET_POS_SET;
356 
357  _WriteByte(AH, BLK_BLOBS); /* Block type */
358  WriteInt(AH, te->dumpId); /* For sanity check */
359 }
360 
361 /*
362  * Called by the archiver when the dumper calls StartBlob.
363  *
364  * Mandatory.
365  *
366  * Must save the passed OID for retrieval at restore-time.
367  */
368 static void
370 {
371  lclContext *ctx = (lclContext *) AH->formatData;
372 
373  if (oid == 0)
374  fatal("invalid OID for large object");
375 
376  WriteInt(AH, oid);
377 
379 }
380 
381 /*
382  * Called by the archiver when the dumper calls EndBlob.
383  *
384  * Optional.
385  */
386 static void
388 {
389  lclContext *ctx = (lclContext *) AH->formatData;
390 
391  EndCompressor(AH, ctx->cs);
392  /* Send the end marker */
393  WriteInt(AH, 0);
394 }
395 
396 /*
397  * Called by the archiver when finishing saving all BLOB DATA.
398  *
399  * Optional.
400  */
401 static void
403 {
404  /* Write out a fake zero OID to mark end-of-blobs. */
405  WriteInt(AH, 0);
406 }
407 
408 /*
409  * Print data for a given TOC entry
410  */
411 static void
413 {
414  lclContext *ctx = (lclContext *) AH->formatData;
415  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
416  int blkType;
417  int id;
418 
419  if (tctx->dataState == K_OFFSET_NO_DATA)
420  return;
421 
422  if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
423  {
424  /*
425  * We cannot seek directly to the desired block. Instead, skip over
426  * block headers until we find the one we want. This could fail if we
427  * are asked to restore items out-of-order.
428  */
429  _readBlockHeader(AH, &blkType, &id);
430 
431  while (blkType != EOF && id != te->dumpId)
432  {
433  switch (blkType)
434  {
435  case BLK_DATA:
436  _skipData(AH);
437  break;
438 
439  case BLK_BLOBS:
440  _skipBlobs(AH);
441  break;
442 
443  default: /* Always have a default */
444  fatal("unrecognized data block type (%d) while searching archive",
445  blkType);
446  break;
447  }
448  _readBlockHeader(AH, &blkType, &id);
449  }
450  }
451  else
452  {
453  /* We can just seek to the place we need to be. */
454  if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
455  fatal("error during file seek: %m");
456 
457  _readBlockHeader(AH, &blkType, &id);
458  }
459 
460  /* Produce suitable failure message if we fell off end of file */
461  if (blkType == EOF)
462  {
463  if (tctx->dataState == K_OFFSET_POS_NOT_SET)
464  fatal("could not find block ID %d in archive -- "
465  "possibly due to out-of-order restore request, "
466  "which cannot be handled due to lack of data offsets in archive",
467  te->dumpId);
468  else if (!ctx->hasSeek)
469  fatal("could not find block ID %d in archive -- "
470  "possibly due to out-of-order restore request, "
471  "which cannot be handled due to non-seekable input file",
472  te->dumpId);
473  else /* huh, the dataPos led us to EOF? */
474  fatal("could not find block ID %d in archive -- "
475  "possibly corrupt archive",
476  te->dumpId);
477  }
478 
479  /* Are we sane? */
480  if (id != te->dumpId)
481  fatal("found unexpected block ID (%d) when reading data -- expected %d",
482  id, te->dumpId);
483 
484  switch (blkType)
485  {
486  case BLK_DATA:
487  _PrintData(AH);
488  break;
489 
490  case BLK_BLOBS:
491  _LoadBlobs(AH, AH->public.ropt->dropSchema);
492  break;
493 
494  default: /* Always have a default */
495  fatal("unrecognized data block type %d while restoring archive",
496  blkType);
497  break;
498  }
499 }
500 
501 /*
502  * Print data from current file position.
503 */
504 static void
506 {
508 }
509 
510 static void
511 _LoadBlobs(ArchiveHandle *AH, bool drop)
512 {
513  Oid oid;
514 
515  StartRestoreBlobs(AH);
516 
517  oid = ReadInt(AH);
518  while (oid != 0)
519  {
520  StartRestoreBlob(AH, oid, drop);
521  _PrintData(AH);
522  EndRestoreBlob(AH, oid);
523  oid = ReadInt(AH);
524  }
525 
526  EndRestoreBlobs(AH);
527 }
528 
529 /*
530  * Skip the BLOBs from the current file position.
531  * BLOBS are written sequentially as data blocks (see below).
532  * Each BLOB is preceded by it's original OID.
533  * A zero OID indicated the end of the BLOBS
534  */
535 static void
537 {
538  Oid oid;
539 
540  oid = ReadInt(AH);
541  while (oid != 0)
542  {
543  _skipData(AH);
544  oid = ReadInt(AH);
545  }
546 }
547 
548 /*
549  * Skip data from current file position.
550  * Data blocks are formatted as an integer length, followed by data.
551  * A zero length denoted the end of the block.
552 */
553 static void
555 {
556  lclContext *ctx = (lclContext *) AH->formatData;
557  size_t blkLen;
558  char *buf = NULL;
559  int buflen = 0;
560  size_t cnt;
561 
562  blkLen = ReadInt(AH);
563  while (blkLen != 0)
564  {
565  if (blkLen > buflen)
566  {
567  if (buf)
568  free(buf);
569  buf = (char *) pg_malloc(blkLen);
570  buflen = blkLen;
571  }
572  if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
573  {
574  if (feof(AH->FH))
575  fatal("could not read from input file: end of file");
576  else
577  fatal("could not read from input file: %m");
578  }
579 
580  ctx->filePos += blkLen;
581 
582  blkLen = ReadInt(AH);
583  }
584 
585  if (buf)
586  free(buf);
587 }
588 
589 /*
590  * Write a byte of data to the archive.
591  *
592  * Mandatory.
593  *
594  * Called by the archiver to do integer & byte output to the archive.
595  */
596 static int
597 _WriteByte(ArchiveHandle *AH, const int i)
598 {
599  lclContext *ctx = (lclContext *) AH->formatData;
600  int res;
601 
602  if ((res = fputc(i, AH->FH)) == EOF)
604  ctx->filePos += 1;
605 
606  return 1;
607 }
608 
609 /*
610  * Read a byte of data from the archive.
611  *
612  * Mandatory
613  *
614  * Called by the archiver to read bytes & integers from the archive.
615  * EOF should be treated as a fatal error.
616  */
617 static int
619 {
620  lclContext *ctx = (lclContext *) AH->formatData;
621  int res;
622 
623  res = getc(AH->FH);
624  if (res == EOF)
625  READ_ERROR_EXIT(AH->FH);
626  ctx->filePos += 1;
627  return res;
628 }
629 
630 /*
631  * Write a buffer of data to the archive.
632  *
633  * Mandatory.
634  *
635  * Called by the archiver to write a block of bytes to the archive.
636  */
637 static void
638 _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
639 {
640  lclContext *ctx = (lclContext *) AH->formatData;
641 
642  if (fwrite(buf, 1, len, AH->FH) != len)
644  ctx->filePos += len;
645 }
646 
647 /*
648  * Read a block of bytes from the archive.
649  *
650  * Mandatory.
651  *
652  * Called by the archiver to read a block of bytes from the archive
653  */
654 static void
655 _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
656 {
657  lclContext *ctx = (lclContext *) AH->formatData;
658 
659  if (fread(buf, 1, len, AH->FH) != len)
660  READ_ERROR_EXIT(AH->FH);
661  ctx->filePos += len;
662 }
663 
664 /*
665  * Close the archive.
666  *
667  * Mandatory.
668  *
669  * When writing the archive, this is the routine that actually starts
670  * the process of saving it to files. No data should be written prior
671  * to this point, since the user could sort the TOC after creating it.
672  *
673  * If an archive is to be written, this routine must call:
674  * WriteHead to save the archive header
675  * WriteToc to save the TOC entries
676  * WriteDataChunks to save all DATA & BLOBs.
677  *
678  */
679 static void
681 {
682  lclContext *ctx = (lclContext *) AH->formatData;
683  pgoff_t tpos;
684 
685  if (AH->mode == archModeWrite)
686  {
687  WriteHead(AH);
688  /* Remember TOC's seek position for use below */
689  tpos = ftello(AH->FH);
690  if (tpos < 0 && ctx->hasSeek)
691  fatal("could not determine seek position in archive file: %m");
692  WriteToc(AH);
693  ctx->dataStart = _getFilePos(AH, ctx);
694  WriteDataChunks(AH, NULL);
695 
696  /*
697  * If possible, re-write the TOC in order to update the data offset
698  * information. This is not essential, as pg_restore can cope in most
699  * cases without it; but it can make pg_restore significantly faster
700  * in some situations (especially parallel restore).
701  */
702  if (ctx->hasSeek &&
703  fseeko(AH->FH, tpos, SEEK_SET) == 0)
704  WriteToc(AH);
705  }
706 
707  if (fclose(AH->FH) != 0)
708  fatal("could not close archive file: %m");
709 
710  /* Sync the output file if one is defined */
711  if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
712  (void) fsync_fname(AH->fSpec, false);
713 
714  AH->FH = NULL;
715 }
716 
717 /*
718  * Reopen the archive's file handle.
719  *
720  * We close the original file handle, except on Windows. (The difference
721  * is because on Windows, this is used within a multithreading context,
722  * and we don't want a thread closing the parent file handle.)
723  */
724 static void
726 {
727  lclContext *ctx = (lclContext *) AH->formatData;
728  pgoff_t tpos;
729 
730  if (AH->mode == archModeWrite)
731  fatal("can only reopen input archives");
732 
733  /*
734  * These two cases are user-facing errors since they represent unsupported
735  * (but not invalid) use-cases. Word the error messages appropriately.
736  */
737  if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
738  fatal("parallel restore from standard input is not supported");
739  if (!ctx->hasSeek)
740  fatal("parallel restore from non-seekable file is not supported");
741 
742  tpos = ftello(AH->FH);
743  if (tpos < 0)
744  fatal("could not determine seek position in archive file: %m");
745 
746 #ifndef WIN32
747  if (fclose(AH->FH) != 0)
748  fatal("could not close archive file: %m");
749 #endif
750 
751  AH->FH = fopen(AH->fSpec, PG_BINARY_R);
752  if (!AH->FH)
753  fatal("could not open input file \"%s\": %m", AH->fSpec);
754 
755  if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
756  fatal("could not set seek position in archive file: %m");
757 }
758 
759 /*
760  * Prepare for parallel restore.
761  *
762  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
763  * TOC entries' dataLength fields with appropriate values to guide the
764  * ordering of restore jobs. The source of said data is format-dependent,
765  * as is the exact meaning of the values.
766  *
767  * A format module might also choose to do other setup here.
768  */
769 static void
771 {
772  lclContext *ctx = (lclContext *) AH->formatData;
773  TocEntry *prev_te = NULL;
774  lclTocEntry *prev_tctx = NULL;
775  TocEntry *te;
776 
777  /*
778  * Knowing that the data items were dumped out in TOC order, we can
779  * reconstruct the length of each item as the delta to the start offset of
780  * the next data item.
781  */
782  for (te = AH->toc->next; te != AH->toc; te = te->next)
783  {
784  lclTocEntry *tctx = (lclTocEntry *) te->formatData;
785 
786  /*
787  * Ignore entries without a known data offset; if we were unable to
788  * seek to rewrite the TOC when creating the archive, this'll be all
789  * of them, and we'll end up with no size estimates.
790  */
791  if (tctx->dataState != K_OFFSET_POS_SET)
792  continue;
793 
794  /* Compute previous data item's length */
795  if (prev_te)
796  {
797  if (tctx->dataPos > prev_tctx->dataPos)
798  prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
799  }
800 
801  prev_te = te;
802  prev_tctx = tctx;
803  }
804 
805  /* If OK to seek, we can determine the length of the last item */
806  if (prev_te && ctx->hasSeek)
807  {
808  pgoff_t endpos;
809 
810  if (fseeko(AH->FH, 0, SEEK_END) != 0)
811  fatal("error during file seek: %m");
812  endpos = ftello(AH->FH);
813  if (endpos > prev_tctx->dataPos)
814  prev_te->dataLength = endpos - prev_tctx->dataPos;
815  }
816 }
817 
818 /*
819  * Clone format-specific fields during parallel restoration.
820  */
821 static void
823 {
824  lclContext *ctx = (lclContext *) AH->formatData;
825 
826  AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
827  memcpy(AH->formatData, ctx, sizeof(lclContext));
828  ctx = (lclContext *) AH->formatData;
829 
830  /* sanity check, shouldn't happen */
831  if (ctx->cs != NULL)
832  fatal("compressor active");
833 
834  /*
835  * Note: we do not make a local lo_buf because we expect at most one BLOBS
836  * entry per archive, so no parallelism is possible. Likewise,
837  * TOC-entry-local state isn't an issue because any one TOC entry is
838  * touched by just one worker child.
839  */
840 }
841 
842 static void
844 {
845  lclContext *ctx = (lclContext *) AH->formatData;
846 
847  free(ctx);
848 }
849 
850 /*
851  * This function is executed in the child of a parallel restore from a
852  * custom-format archive and restores the actual data for one TOC entry.
853  */
854 static int
856 {
857  return parallel_restore(AH, te);
858 }
859 
860 /*--------------------------------------------------
861  * END OF FORMAT CALLBACKS
862  *--------------------------------------------------
863  */
864 
865 /*
866  * Get the current position in the archive file.
867  */
868 static pgoff_t
870 {
871  pgoff_t pos;
872 
873  if (ctx->hasSeek)
874  {
875  /*
876  * Prior to 1.7 (pg7.3) we relied on the internally maintained
877  * pointer. Now we rely on ftello() always, unless the file has been
878  * found to not support it. For debugging purposes, print a warning
879  * if the internal pointer disagrees, so that we're more likely to
880  * notice if something's broken about the internal position tracking.
881  */
882  pos = ftello(AH->FH);
883  if (pos < 0)
884  fatal("could not determine seek position in archive file: %m");
885 
886  if (pos != ctx->filePos)
887  pg_log_warning("ftell mismatch with expected position -- ftell used");
888  }
889  else
890  pos = ctx->filePos;
891  return pos;
892 }
893 
894 /*
895  * Read a data block header. The format changed in V1.3, so we
896  * centralize the code here for simplicity. Returns *type = EOF
897  * if at EOF.
898  */
899 static void
901 {
902  lclContext *ctx = (lclContext *) AH->formatData;
903  int byt;
904 
905  /*
906  * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
907  * ReadInt rather than returning EOF. It doesn't seem worth jumping
908  * through hoops to deal with that case better, because no such files are
909  * likely to exist in the wild: only some 7.1 development versions of
910  * pg_dump ever generated such files.
911  */
912  if (AH->version < K_VERS_1_3)
913  *type = BLK_DATA;
914  else
915  {
916  byt = getc(AH->FH);
917  *type = byt;
918  if (byt == EOF)
919  {
920  *id = 0; /* don't return an uninitialized value */
921  return;
922  }
923  ctx->filePos += 1;
924  }
925 
926  *id = ReadInt(AH);
927 }
928 
929 /*
930  * Callback function for WriteDataToArchive. Writes one block of (compressed)
931  * data to the archive.
932  */
933 static void
934 _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
935 {
936  /* never write 0-byte blocks (this should not happen) */
937  if (len > 0)
938  {
939  WriteInt(AH, len);
940  _WriteBuf(AH, buf, len);
941  }
942 }
943 
944 /*
945  * Callback function for ReadDataFromArchive. To keep things simple, we
946  * always read one compressed block at a time.
947  */
948 static size_t
949 _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
950 {
951  size_t blkLen;
952 
953  /* Read length */
954  blkLen = ReadInt(AH);
955  if (blkLen == 0)
956  return 0;
957 
958  /* If the caller's buffer is not large enough, allocate a bigger one */
959  if (blkLen > *buflen)
960  {
961  free(*buf);
962  *buf = (char *) pg_malloc(blkLen);
963  *buflen = blkLen;
964  }
965 
966  /* exits app on read errors */
967  _ReadBuf(AH, *buf, blkLen);
968 
969  return blkLen;
970 }
struct _tocEntry * next
static void _ReopenArchive(ArchiveHandle *AH)
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
ReopenPtrType ReopenPtr
ReadBufPtrType ReadBufPtr
void ReadToc(ArchiveHandle *AH)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
void ReadHead(ArchiveHandle *AH)
DeClonePtrType DeClonePtr
static void _Clone(ArchiveHandle *AH)
#define BLK_DATA
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
RestoreOptions * ropt
Definition: pg_backup.h:183
WriteBufPtrType WriteBufPtr
#define BLK_BLOBS
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:617
StartBlobPtrType StartBlobPtr
static void _skipBlobs(ArchiveHandle *AH)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
EndDataPtrType EndDataPtr
#define PG_BINARY_W
Definition: c.h:1225
void WriteToc(ArchiveHandle *AH)
DataDumperPtr dataDumper
int ReadInt(ArchiveHandle *AH)
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
StartBlobsPtrType StartBlobsPtr
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY_R
Definition: c.h:1224
void StartRestoreBlobs(ArchiveHandle *AH)
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
ReadBytePtrType ReadBytePtr
static void _PrintData(ArchiveHandle *AH)
CompressorState * cs
#define K_VERS_1_7
#define pgoff_t
Definition: win32_port.h:195
#define ftello(stream)
Definition: win32_port.h:204
EndBlobPtrType EndBlobPtr
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static void _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static XLogRecPtr endpos
Definition: pg_receivewal.c:46
void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
struct _tocEntry * toc
static char * buf
Definition: pg_test_fsync.c:67
StartDataPtrType StartDataPtr
static void _LoadBlobs(ArchiveHandle *AH, bool drop)
#define K_OFFSET_NO_DATA
WriteBytePtrType WriteBytePtr
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_3
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static void _skipData(ArchiveHandle *AH)
static void _PrepParallelRestore(ArchiveHandle *AH)
void EndRestoreBlobs(ArchiveHandle *AH)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static int _ReadByte(ArchiveHandle *)
int verbose
Definition: pg_backup.h:185
PrintTocDataPtrType PrintTocDataPtr
#define fseeko(stream, offset, origin)
Definition: win32_port.h:201
WriteDataPtrType WriteDataPtr
#define free(a)
Definition: header.h:65
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te)
PrepParallelRestorePtrType PrepParallelRestorePtr
EndBlobsPtrType EndBlobsPtr
size_t WriteInt(ArchiveHandle *AH, int i)
ArchiveEntryPtrType ArchiveEntryPtr
#define fatal(...)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
#define INT64_FORMAT
Definition: c.h:401
void WriteHead(ArchiveHandle *AH)
#define WRITE_ERROR_EXIT
pgoff_t dataStart
ClonePtrType ClonePtr
void EndRestoreBlob(ArchiveHandle *AH, Oid oid)
#define K_OFFSET_POS_SET
int i
WorkerJobRestorePtrType WorkerJobRestorePtr
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define READ_ERROR_EXIT(fd)
static void _DeClone(ArchiveHandle *AH)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
bool checkSeek(FILE *fp)
ClosePtrType ClosePtr
PrintExtraTocPtrType PrintExtraTocPtr
#define pg_log_warning(...)
Definition: pgfnames.c:24
pgoff_t filePos
static int _WriteByte(ArchiveHandle *AH, const int i)
WriteExtraTocPtrType WriteExtraTocPtr
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:201
#define LOBBUFSIZE
ReadExtraTocPtrType ReadExtraTocPtr
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
#define K_OFFSET_POS_NOT_SET
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
WorkerJobDumpPtrType WorkerJobDumpPtr