PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pg_backup_custom.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_custom.c
4 *
5 * Implements the custom output format.
6 *
7 * The comments with the routines in this code are a good place to
8 * understand how to write a new format.
9 *
10 * See the headers to pg_restore for more details.
11 *
12 * Copyright (c) 2000, Philip Warner
13 * Rights are granted to use this software in any way so long
14 * as this notice is not removed.
15 *
16 * The author is not responsible for loss or damages that may
17 * and any liability will be limited to the time taken to fix any
18 * related bug.
19 *
20 *
21 * IDENTIFICATION
22 * src/bin/pg_dump/pg_backup_custom.c
23 *
24 *-------------------------------------------------------------------------
25 */
26#include "postgres_fe.h"
27
28#include "common/file_utils.h"
29#include "compress_io.h"
30#include "pg_backup_utils.h"
31
32/*--------
33 * Routines in the format interface
34 *--------
35 */
36
37static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38static void _StartData(ArchiveHandle *AH, TocEntry *te);
39static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40static void _EndData(ArchiveHandle *AH, TocEntry *te);
41static int _WriteByte(ArchiveHandle *AH, const int i);
42static int _ReadByte(ArchiveHandle *AH);
43static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45static void _CloseArchive(ArchiveHandle *AH);
46static void _ReopenArchive(ArchiveHandle *AH);
47static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51
52static void _PrintData(ArchiveHandle *AH);
53static void _skipData(ArchiveHandle *AH);
54static void _skipLOs(ArchiveHandle *AH);
55
56static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
57static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
58static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
60static void _LoadLOs(ArchiveHandle *AH, bool drop);
61
62static void _PrepParallelRestore(ArchiveHandle *AH);
63static void _Clone(ArchiveHandle *AH);
64static void _DeClone(ArchiveHandle *AH);
65
67
68typedef struct
69{
72 /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
73 pgoff_t lastFilePos; /* position after last data block we've read */
75
76typedef struct
77{
79 pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81
82
83/*------
84 * Static declarations
85 *------
86 */
87static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89
90static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92
93
94/*
95 * Init routine required by ALL formats. This is a global routine
96 * and should be declared in pg_backup_archiver.h
97 *
98 * It's task is to create any extra archive context (using AH->formatData),
99 * and to initialize the supported function pointers.
100 *
101 * It should also prepare whatever its input source is for reading/writing,
102 * and in the case of a read mode connection, it should load the Header & TOC.
103 */
104void
106{
107 lclContext *ctx;
108
109 /* Assuming static functions, this can be copied for each format. */
113 AH->EndDataPtr = _EndData;
117 AH->ReadBufPtr = _ReadBuf;
124
126 AH->StartLOPtr = _StartLO;
127 AH->EndLOPtr = _EndLO;
128 AH->EndLOsPtr = _EndLOs;
129
131 AH->ClonePtr = _Clone;
132 AH->DeClonePtr = _DeClone;
133
134 /* no parallel dump in the custom archive, only parallel restore */
135 AH->WorkerJobDumpPtr = NULL;
137
138 /* Set up a private area. */
139 ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
140 AH->formatData = ctx;
141
142 /*
143 * Now open the file
144 */
145 if (AH->mode == archModeWrite)
146 {
147 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
148 {
149 AH->FH = fopen(AH->fSpec, PG_BINARY_W);
150 if (!AH->FH)
151 pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
152 }
153 else
154 {
155 AH->FH = stdout;
156 if (!AH->FH)
157 pg_fatal("could not open output file: %m");
158 }
159
160 ctx->hasSeek = checkSeek(AH->FH);
161 }
162 else
163 {
164 if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
165 {
166 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
167 if (!AH->FH)
168 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
169 }
170 else
171 {
172 AH->FH = stdin;
173 if (!AH->FH)
174 pg_fatal("could not open input file: %m");
175 }
176
177 ctx->hasSeek = checkSeek(AH->FH);
178
179 ReadHead(AH);
180 ReadToc(AH);
181
182 /*
183 * Remember location of first data block (i.e., the point after TOC)
184 * in case we have to search for desired data blocks.
185 */
186 ctx->lastFilePos = _getFilePos(AH, ctx);
187 }
188}
189
190/*
191 * Called by the Archiver when the dumper creates a new TOC entry.
192 *
193 * Optional.
194 *
195 * Set up extract format-related TOC data.
196*/
197static void
199{
200 lclTocEntry *ctx;
201
202 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
203 if (te->dataDumper)
205 else
207
208 te->formatData = ctx;
209}
210
211/*
212 * Called by the Archiver to save any extra format-related TOC entry
213 * data.
214 *
215 * Optional.
216 *
217 * Use the Archiver routines to write data - they are non-endian, and
218 * maintain other important file information.
219 */
220static void
222{
223 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
224
225 WriteOffset(AH, ctx->dataPos, ctx->dataState);
226}
227
228/*
229 * Called by the Archiver to read any extra format-related TOC data.
230 *
231 * Optional.
232 *
233 * Needs to match the order defined in _WriteExtraToc, and should also
234 * use the Archiver input routines.
235 */
236static void
238{
239 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
240
241 if (ctx == NULL)
242 {
243 ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
244 te->formatData = ctx;
245 }
246
247 ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
248
249 /*
250 * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
251 * dump it at all.
252 */
253 if (AH->version < K_VERS_1_7)
254 ReadInt(AH);
255}
256
257/*
258 * Called by the Archiver when restoring an archive to output a comment
259 * that includes useful information about the TOC entry.
260 *
261 * Optional.
262 */
263static void
265{
266 lclTocEntry *ctx = (lclTocEntry *) te->formatData;
267
268 if (AH->public.verbose)
269 ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
270 (int64) ctx->dataPos);
271}
272
273/*
274 * Called by the archiver when saving TABLE DATA (not schema). This routine
275 * should save whatever format-specific information is needed to read
276 * the archive back.
277 *
278 * It is called just prior to the dumper's 'DataDumper' routine being called.
279 *
280 * Optional, but strongly recommended.
281 *
282 */
283static void
285{
286 lclContext *ctx = (lclContext *) AH->formatData;
287 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
288
289 tctx->dataPos = _getFilePos(AH, ctx);
290 if (tctx->dataPos >= 0)
291 tctx->dataState = K_OFFSET_POS_SET;
292
293 _WriteByte(AH, BLK_DATA); /* Block type */
294 WriteInt(AH, te->dumpId); /* For sanity check */
295
297 NULL,
299}
300
301/*
302 * Called by archiver when dumper calls WriteData. This routine is
303 * called for both LO and table data; it is the responsibility of
304 * the format to manage each kind of data using StartLO/StartData.
305 *
306 * It should only be called from within a DataDumper routine.
307 *
308 * Mandatory.
309 */
310static void
311_WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
312{
313 lclContext *ctx = (lclContext *) AH->formatData;
314 CompressorState *cs = ctx->cs;
315
316 if (dLen > 0)
317 /* writeData() internally throws write errors */
318 cs->writeData(AH, cs, data, dLen);
319}
320
321/*
322 * Called by the archiver when a dumper's 'DataDumper' routine has
323 * finished.
324 *
325 * Mandatory.
326 */
327static void
329{
330 lclContext *ctx = (lclContext *) AH->formatData;
331
332 EndCompressor(AH, ctx->cs);
333 ctx->cs = NULL;
334
335 /* Send the end marker */
336 WriteInt(AH, 0);
337}
338
339/*
340 * Called by the archiver when starting to save BLOB DATA (not schema).
341 * This routine should save whatever format-specific information is needed
342 * to read the LOs back into memory.
343 *
344 * It is called just prior to the dumper's DataDumper routine.
345 *
346 * Optional, but strongly recommended.
347 */
348static void
350{
351 lclContext *ctx = (lclContext *) AH->formatData;
352 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
353
354 tctx->dataPos = _getFilePos(AH, ctx);
355 if (tctx->dataPos >= 0)
356 tctx->dataState = K_OFFSET_POS_SET;
357
358 _WriteByte(AH, BLK_BLOBS); /* Block type */
359 WriteInt(AH, te->dumpId); /* For sanity check */
360}
361
362/*
363 * Called by the archiver when the dumper calls StartLO.
364 *
365 * Mandatory.
366 *
367 * Must save the passed OID for retrieval at restore-time.
368 */
369static void
371{
372 lclContext *ctx = (lclContext *) AH->formatData;
373
374 if (oid == 0)
375 pg_fatal("invalid OID for large object");
376
377 WriteInt(AH, oid);
378
380 NULL,
382}
383
384/*
385 * Called by the archiver when the dumper calls EndLO.
386 *
387 * Optional.
388 */
389static void
391{
392 lclContext *ctx = (lclContext *) AH->formatData;
393
394 EndCompressor(AH, ctx->cs);
395 /* Send the end marker */
396 WriteInt(AH, 0);
397}
398
399/*
400 * Called by the archiver when finishing saving BLOB DATA.
401 *
402 * Optional.
403 */
404static void
406{
407 /* Write out a fake zero OID to mark end-of-LOs. */
408 WriteInt(AH, 0);
409}
410
411/*
412 * Print data for a given TOC entry
413 */
414static void
416{
417 lclContext *ctx = (lclContext *) AH->formatData;
418 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419 int blkType;
420 int id;
421
422 if (tctx->dataState == K_OFFSET_NO_DATA)
423 return;
424
425 if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426 {
427 /*
428 * We cannot seek directly to the desired block. Instead, skip over
429 * block headers until we find the one we want. Remember the
430 * positions of skipped-over blocks, so that if we later decide we
431 * need to read one, we'll be able to seek to it.
432 *
433 * When our input file is seekable, we can do the search starting from
434 * the point after the last data block we scanned in previous
435 * iterations of this function.
436 */
437 if (ctx->hasSeek)
438 {
439 if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
440 pg_fatal("error during file seek: %m");
441 }
442
443 for (;;)
444 {
445 pgoff_t thisBlkPos = _getFilePos(AH, ctx);
446
447 _readBlockHeader(AH, &blkType, &id);
448
449 if (blkType == EOF || id == te->dumpId)
450 break;
451
452 /* Remember the block position, if we got one */
453 if (thisBlkPos >= 0)
454 {
455 TocEntry *otherte = getTocEntryByDumpId(AH, id);
456
457 if (otherte && otherte->formatData)
458 {
459 lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
460
461 /*
462 * Note: on Windows, multiple threads might access/update
463 * the same lclTocEntry concurrently, but that should be
464 * safe as long as we update dataPos before dataState.
465 * Ideally, we'd use pg_write_barrier() to enforce that,
466 * but the needed infrastructure doesn't exist in frontend
467 * code. But Windows only runs on machines with strong
468 * store ordering, so it should be okay for now.
469 */
470 if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
471 {
472 othertctx->dataPos = thisBlkPos;
473 othertctx->dataState = K_OFFSET_POS_SET;
474 }
475 else if (othertctx->dataPos != thisBlkPos ||
476 othertctx->dataState != K_OFFSET_POS_SET)
477 {
478 /* sanity check */
479 pg_log_warning("data block %d has wrong seek position",
480 id);
481 }
482 }
483 }
484
485 switch (blkType)
486 {
487 case BLK_DATA:
488 _skipData(AH);
489 break;
490
491 case BLK_BLOBS:
492 _skipLOs(AH);
493 break;
494
495 default: /* Always have a default */
496 pg_fatal("unrecognized data block type (%d) while searching archive",
497 blkType);
498 break;
499 }
500 }
501 }
502 else
503 {
504 /* We can just seek to the place we need to be. */
505 if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
506 pg_fatal("error during file seek: %m");
507
508 _readBlockHeader(AH, &blkType, &id);
509 }
510
511 /*
512 * If we reached EOF without finding the block we want, then either it
513 * doesn't exist, or it does but we lack the ability to seek back to it.
514 */
515 if (blkType == EOF)
516 {
517 if (!ctx->hasSeek)
518 pg_fatal("could not find block ID %d in archive -- "
519 "possibly due to out-of-order restore request, "
520 "which cannot be handled due to non-seekable input file",
521 te->dumpId);
522 else
523 pg_fatal("could not find block ID %d in archive -- "
524 "possibly corrupt archive",
525 te->dumpId);
526 }
527
528 /* Are we sane? */
529 if (id != te->dumpId)
530 pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
531 id, te->dumpId);
532
533 switch (blkType)
534 {
535 case BLK_DATA:
536 _PrintData(AH);
537 break;
538
539 case BLK_BLOBS:
540 _LoadLOs(AH, AH->public.ropt->dropSchema);
541 break;
542
543 default: /* Always have a default */
544 pg_fatal("unrecognized data block type %d while restoring archive",
545 blkType);
546 break;
547 }
548
549 /*
550 * If our input file is seekable but lacks data offsets, update our
551 * knowledge of where to start future searches from. (Note that we did
552 * not update the current TE's dataState/dataPos. We could have, but
553 * there is no point since it will not be visited again.)
554 */
555 if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
556 {
557 pgoff_t curPos = _getFilePos(AH, ctx);
558
559 if (curPos > ctx->lastFilePos)
560 ctx->lastFilePos = curPos;
561 }
562}
563
564/*
565 * Print data from current file position.
566*/
567static void
569{
570 CompressorState *cs;
571
573 _CustomReadFunc, NULL);
574 cs->readData(AH, cs);
575 EndCompressor(AH, cs);
576}
577
578static void
579_LoadLOs(ArchiveHandle *AH, bool drop)
580{
581 Oid oid;
582
583 StartRestoreLOs(AH);
584
585 oid = ReadInt(AH);
586 while (oid != 0)
587 {
588 StartRestoreLO(AH, oid, drop);
589 _PrintData(AH);
590 EndRestoreLO(AH, oid);
591 oid = ReadInt(AH);
592 }
593
594 EndRestoreLOs(AH);
595}
596
597/*
598 * Skip the LOs from the current file position.
599 * LOs are written sequentially as data blocks (see below).
600 * Each LO is preceded by its original OID.
601 * A zero OID indicates the end of the LOs.
602 */
603static void
605{
606 Oid oid;
607
608 oid = ReadInt(AH);
609 while (oid != 0)
610 {
611 _skipData(AH);
612 oid = ReadInt(AH);
613 }
614}
615
616/*
617 * Skip data from current file position.
618 * Data blocks are formatted as an integer length, followed by data.
619 * A zero length indicates the end of the block.
620*/
621static void
623{
624 lclContext *ctx = (lclContext *) AH->formatData;
625 size_t blkLen;
626 char *buf = NULL;
627 int buflen = 0;
628
629 blkLen = ReadInt(AH);
630 while (blkLen != 0)
631 {
632 if (ctx->hasSeek)
633 {
634 if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
635 pg_fatal("error during file seek: %m");
636 }
637 else
638 {
639 if (blkLen > buflen)
640 {
641 free(buf);
642 buf = (char *) pg_malloc(blkLen);
643 buflen = blkLen;
644 }
645 if (fread(buf, 1, blkLen, AH->FH) != blkLen)
646 {
647 if (feof(AH->FH))
648 pg_fatal("could not read from input file: end of file");
649 else
650 pg_fatal("could not read from input file: %m");
651 }
652 }
653
654 blkLen = ReadInt(AH);
655 }
656
657 free(buf);
658}
659
660/*
661 * Write a byte of data to the archive.
662 *
663 * Mandatory.
664 *
665 * Called by the archiver to do integer & byte output to the archive.
666 */
667static int
669{
670 if (fputc(i, AH->FH) == EOF)
672
673 return 1;
674}
675
676/*
677 * Read a byte of data from the archive.
678 *
679 * Mandatory
680 *
681 * Called by the archiver to read bytes & integers from the archive.
682 * EOF should be treated as a fatal error.
683 */
684static int
686{
687 int res;
688
689 res = getc(AH->FH);
690 if (res == EOF)
691 READ_ERROR_EXIT(AH->FH);
692 return res;
693}
694
695/*
696 * Write a buffer of data to the archive.
697 *
698 * Mandatory.
699 *
700 * Called by the archiver to write a block of bytes to the archive.
701 */
702static void
703_WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
704{
705 if (fwrite(buf, 1, len, AH->FH) != len)
707}
708
709/*
710 * Read a block of bytes from the archive.
711 *
712 * Mandatory.
713 *
714 * Called by the archiver to read a block of bytes from the archive
715 */
716static void
717_ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
718{
719 if (fread(buf, 1, len, AH->FH) != len)
720 READ_ERROR_EXIT(AH->FH);
721}
722
723/*
724 * Close the archive.
725 *
726 * Mandatory.
727 *
728 * When writing the archive, this is the routine that actually starts
729 * the process of saving it to files. No data should be written prior
730 * to this point, since the user could sort the TOC after creating it.
731 *
732 * If an archive is to be written, this routine must call:
733 * WriteHead to save the archive header
734 * WriteToc to save the TOC entries
735 * WriteDataChunks to save all data & LOs.
736 *
737 */
738static void
740{
741 lclContext *ctx = (lclContext *) AH->formatData;
742 pgoff_t tpos;
743
744 if (AH->mode == archModeWrite)
745 {
746 WriteHead(AH);
747 /* Remember TOC's seek position for use below */
748 tpos = ftello(AH->FH);
749 if (tpos < 0 && ctx->hasSeek)
750 pg_fatal("could not determine seek position in archive file: %m");
751 WriteToc(AH);
752 WriteDataChunks(AH, NULL);
753
754 /*
755 * If possible, re-write the TOC in order to update the data offset
756 * information. This is not essential, as pg_restore can cope in most
757 * cases without it; but it can make pg_restore significantly faster
758 * in some situations (especially parallel restore). We can skip this
759 * step if we're not dumping any data; there are no offsets to update
760 * in that case.
761 */
762 if (ctx->hasSeek && AH->public.dopt->dumpData &&
763 fseeko(AH->FH, tpos, SEEK_SET) == 0)
764 WriteToc(AH);
765 }
766
767 if (fclose(AH->FH) != 0)
768 pg_fatal("could not close archive file: %m");
769
770 /* Sync the output file if one is defined */
771 if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
772 (void) fsync_fname(AH->fSpec, false);
773
774 AH->FH = NULL;
775}
776
777/*
778 * Reopen the archive's file handle.
779 *
780 * We close the original file handle, except on Windows. (The difference
781 * is because on Windows, this is used within a multithreading context,
782 * and we don't want a thread closing the parent file handle.)
783 */
784static void
786{
787 lclContext *ctx = (lclContext *) AH->formatData;
788 pgoff_t tpos;
789
790 if (AH->mode == archModeWrite)
791 pg_fatal("can only reopen input archives");
792
793 /*
794 * These two cases are user-facing errors since they represent unsupported
795 * (but not invalid) use-cases. Word the error messages appropriately.
796 */
797 if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
798 pg_fatal("parallel restore from standard input is not supported");
799 if (!ctx->hasSeek)
800 pg_fatal("parallel restore from non-seekable file is not supported");
801
802 tpos = ftello(AH->FH);
803 if (tpos < 0)
804 pg_fatal("could not determine seek position in archive file: %m");
805
806#ifndef WIN32
807 if (fclose(AH->FH) != 0)
808 pg_fatal("could not close archive file: %m");
809#endif
810
811 AH->FH = fopen(AH->fSpec, PG_BINARY_R);
812 if (!AH->FH)
813 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
814
815 if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
816 pg_fatal("could not set seek position in archive file: %m");
817}
818
819/*
820 * Prepare for parallel restore.
821 *
822 * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
823 * TOC entries' dataLength fields with appropriate values to guide the
824 * ordering of restore jobs. The source of said data is format-dependent,
825 * as is the exact meaning of the values.
826 *
827 * A format module might also choose to do other setup here.
828 */
829static void
831{
832 lclContext *ctx = (lclContext *) AH->formatData;
833 TocEntry *prev_te = NULL;
834 lclTocEntry *prev_tctx = NULL;
835 TocEntry *te;
836
837 /*
838 * Knowing that the data items were dumped out in TOC order, we can
839 * reconstruct the length of each item as the delta to the start offset of
840 * the next data item.
841 */
842 for (te = AH->toc->next; te != AH->toc; te = te->next)
843 {
844 lclTocEntry *tctx = (lclTocEntry *) te->formatData;
845
846 /*
847 * Ignore entries without a known data offset; if we were unable to
848 * seek to rewrite the TOC when creating the archive, this'll be all
849 * of them, and we'll end up with no size estimates.
850 */
851 if (tctx->dataState != K_OFFSET_POS_SET)
852 continue;
853
854 /* Compute previous data item's length */
855 if (prev_te)
856 {
857 if (tctx->dataPos > prev_tctx->dataPos)
858 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
859 }
860
861 prev_te = te;
862 prev_tctx = tctx;
863 }
864
865 /* If OK to seek, we can determine the length of the last item */
866 if (prev_te && ctx->hasSeek)
867 {
869
870 if (fseeko(AH->FH, 0, SEEK_END) != 0)
871 pg_fatal("error during file seek: %m");
872 endpos = ftello(AH->FH);
873 if (endpos > prev_tctx->dataPos)
874 prev_te->dataLength = endpos - prev_tctx->dataPos;
875 }
876}
877
878/*
879 * Clone format-specific fields during parallel restoration.
880 */
881static void
883{
884 lclContext *ctx = (lclContext *) AH->formatData;
885
886 /*
887 * Each thread must have private lclContext working state.
888 */
889 AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
890 memcpy(AH->formatData, ctx, sizeof(lclContext));
891 ctx = (lclContext *) AH->formatData;
892
893 /* sanity check, shouldn't happen */
894 if (ctx->cs != NULL)
895 pg_fatal("compressor active");
896
897 /*
898 * We intentionally do not clone TOC-entry-local state: it's useful to
899 * share knowledge about where the data blocks are across threads.
900 * _PrintTocData has to be careful about the order of operations on that
901 * state, though.
902 */
903}
904
905static void
907{
908 lclContext *ctx = (lclContext *) AH->formatData;
909
910 free(ctx);
911}
912
913/*
914 * This function is executed in the child of a parallel restore from a
915 * custom-format archive and restores the actual data for one TOC entry.
916 */
917static int
919{
920 return parallel_restore(AH, te);
921}
922
923/*--------------------------------------------------
924 * END OF FORMAT CALLBACKS
925 *--------------------------------------------------
926 */
927
928/*
929 * Get the current position in the archive file.
930 *
931 * With a non-seekable archive file, we may not be able to obtain the
932 * file position. If so, just return -1. It's not too important in
933 * that case because we won't be able to rewrite the TOC to fill in
934 * data block offsets anyway.
935 */
936static pgoff_t
938{
939 pgoff_t pos;
940
941 pos = ftello(AH->FH);
942 if (pos < 0)
943 {
944 /* Not expected if we found we can seek. */
945 if (ctx->hasSeek)
946 pg_fatal("could not determine seek position in archive file: %m");
947 }
948 return pos;
949}
950
951/*
952 * Read a data block header. The format changed in V1.3, so we
953 * centralize the code here for simplicity. Returns *type = EOF
954 * if at EOF.
955 */
956static void
958{
959 int byt;
960
961 /*
962 * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
963 * inside ReadInt rather than returning EOF. It doesn't seem worth
964 * jumping through hoops to deal with that case better, because no such
965 * files are likely to exist in the wild: only some 7.1 development
966 * versions of pg_dump ever generated such files.
967 */
968 if (AH->version < K_VERS_1_3)
969 *type = BLK_DATA;
970 else
971 {
972 byt = getc(AH->FH);
973 *type = byt;
974 if (byt == EOF)
975 {
976 *id = 0; /* don't return an uninitialized value */
977 return;
978 }
979 }
980
981 *id = ReadInt(AH);
982}
983
984/*
985 * Callback function for writeData. Writes one block of (compressed)
986 * data to the archive.
987 */
988static void
989_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
990{
991 /* never write 0-byte blocks (this should not happen) */
992 if (len > 0)
993 {
994 WriteInt(AH, len);
995 _WriteBuf(AH, buf, len);
996 }
997}
998
999/*
1000 * Callback function for readData. To keep things simple, we
1001 * always read one compressed block at a time.
1002 */
1003static size_t
1004_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1005{
1006 size_t blkLen;
1007
1008 /* Read length */
1009 blkLen = ReadInt(AH);
1010 if (blkLen == 0)
1011 return 0;
1012
1013 /* If the caller's buffer is not large enough, allocate a bigger one */
1014 if (blkLen > *buflen)
1015 {
1016 free(*buf);
1017 *buf = (char *) pg_malloc(blkLen);
1018 *buflen = blkLen;
1019 }
1020
1021 /* exits app on read errors */
1022 _ReadBuf(AH, *buf, blkLen);
1023
1024 return blkLen;
1025}
#define PG_BINARY_R
Definition: c.h:1246
#define INT64_FORMAT
Definition: c.h:520
int64_t int64
Definition: c.h:499
#define PG_BINARY_W
Definition: c.h:1247
CompressorState * AllocateCompressor(const pg_compress_specification compression_spec, ReadFunc readF, WriteFunc writeF)
Definition: compress_io.c:123
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:148
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
for(;;)
#define free(a)
Definition: header.h:65
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
@ archModeWrite
Definition: pg_backup.h:51
bool checkSeek(FILE *fp)
void WriteHead(ArchiveHandle *AH)
size_t WriteInt(ArchiveHandle *AH, int i)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
void StartRestoreLOs(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
int ReadInt(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void ReadToc(ArchiveHandle *AH)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
void WriteToc(ArchiveHandle *AH)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
#define K_OFFSET_NO_DATA
#define WRITE_ERROR_EXIT
#define BLK_DATA
#define BLK_BLOBS
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define READ_ERROR_EXIT(fd)
#define K_VERS_1_3
#define K_VERS_1_7
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te)
static void _skipData(ArchiveHandle *AH)
static void _StartData(ArchiveHandle *AH, TocEntry *te)
static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
static void _CloseArchive(ArchiveHandle *AH)
static void _skipLOs(ArchiveHandle *AH)
static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx)
static void _DeClone(ArchiveHandle *AH)
static void _ReopenArchive(ArchiveHandle *AH)
static void _StartLOs(ArchiveHandle *AH, TocEntry *te)
void InitArchiveFmt_Custom(ArchiveHandle *AH)
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
static void _EndLOs(ArchiveHandle *AH, TocEntry *te)
static int _WriteByte(ArchiveHandle *AH, const int i)
static void _PrepParallelRestore(ArchiveHandle *AH)
static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
static int _ReadByte(ArchiveHandle *AH)
static void _PrintData(ArchiveHandle *AH)
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
static void _EndData(ArchiveHandle *AH, TocEntry *te)
static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
static void _Clone(ArchiveHandle *AH)
static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
static void _LoadLOs(ArchiveHandle *AH, bool drop)
#define pg_fatal(...)
const void size_t len
const void * data
static XLogRecPtr endpos
Definition: pg_receivewal.c:56
static char * buf
Definition: pg_test_fsync.c:72
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define pgoff_t
Definition: port.h:401
unsigned int Oid
Definition: postgres_ext.h:30
DumpOptions * dopt
Definition: pg_backup.h:224
int verbose
Definition: pg_backup.h:227
RestoreOptions * ropt
Definition: pg_backup.h:225
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
ReadExtraTocPtrType ReadExtraTocPtr
WorkerJobDumpPtrType WorkerJobDumpPtr
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
WorkerJobRestorePtrType WorkerJobRestorePtr
PrintTocDataPtrType PrintTocDataPtr
WriteBytePtrType WriteBytePtr
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
EndDataPtrType EndDataPtr
ClosePtrType ClosePtr
bool dumpData
Definition: pg_backup.h:214
DataDumperPtr dataDumper
struct _tocEntry * next
pgoff_t lastFilePos
CompressorState * cs
const char * type
#define ftello(stream)
Definition: win32_port.h:209
#define fseeko(stream, offset, origin)
Definition: win32_port.h:206