PostgreSQL Source Code git master
pg_backup_archiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres_fe.h"
23
24#include <ctype.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#ifdef WIN32
30#include <io.h>
31#endif
32
33#include "catalog/pg_class_d.h"
34#include "common/string.h"
35#include "compress_io.h"
36#include "dumputils.h"
38#include "lib/binaryheap.h"
39#include "lib/stringinfo.h"
40#include "libpq/libpq-fs.h"
41#include "parallel.h"
42#include "pg_backup_archiver.h"
43#include "pg_backup_db.h"
44#include "pg_backup_utils.h"
45
46#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48
49
50static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
51 const pg_compress_specification compression_spec,
53 SetupWorkerPtrType setupWorkerPtr,
55static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
56static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
57static char *sanitize_line(const char *str, bool want_hyphen);
59static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
60static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
61static void _becomeUser(ArchiveHandle *AH, const char *user);
62static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
63static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
64static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
65static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
67 TocEntry *te);
68static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
71static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
73static bool _tocEntryIsACL(TocEntry *te);
77static void buildTocEntryArrays(ArchiveHandle *AH);
78static void _moveBefore(TocEntry *pos, TocEntry *te);
80
81static int RestoringToDB(ArchiveHandle *AH);
82static void dump_lo_buf(ArchiveHandle *AH);
83static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
84static void SetOutput(ArchiveHandle *AH, const char *filename,
85 const pg_compress_specification compression_spec);
87static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
88
89static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
91 TocEntry *pending_list);
93 ParallelState *pstate,
94 TocEntry *pending_list);
96 TocEntry *pending_list);
98static void pending_list_append(TocEntry *l, TocEntry *te);
99static void pending_list_remove(TocEntry *te);
100static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
101static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
102static void move_to_ready_heap(TocEntry *pending_list,
103 binaryheap *ready_heap,
104 RestorePass pass);
105static TocEntry *pop_next_work_item(binaryheap *ready_heap,
106 ParallelState *pstate);
107static void mark_dump_job_done(ArchiveHandle *AH,
108 TocEntry *te,
109 int status,
110 void *callback_data);
112 TocEntry *te,
113 int status,
114 void *callback_data);
115static void fix_dependencies(ArchiveHandle *AH);
116static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
119static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
120 binaryheap *ready_heap);
121static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
123
124static void StrictNamesCheck(RestoreOptions *ropt);
125
126
127/*
128 * Allocate a new DumpOptions block containing all default values.
129 */
132{
134
136 return opts;
137}
138
139/*
140 * Initialize a DumpOptions struct to all default values
141 */
142void
144{
145 memset(opts, 0, sizeof(DumpOptions));
146 /* set any fields that shouldn't default to zeroes */
147 opts->include_everything = true;
148 opts->cparams.promptPassword = TRI_DEFAULT;
149 opts->dumpSections = DUMP_UNSECTIONED;
150 opts->dumpSchema = true;
151 opts->dumpData = true;
152}
153
154/*
155 * Create a freshly allocated DumpOptions with options equivalent to those
156 * found in the given RestoreOptions.
157 */
160{
161 DumpOptions *dopt = NewDumpOptions();
162
163 /* this is the inverse of what's at the end of pg_dump.c's main() */
164 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
165 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
166 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
167 dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
169 dopt->outputClean = ropt->dropSchema;
170 dopt->dumpData = ropt->dumpData;
171 dopt->dumpSchema = ropt->dumpSchema;
172 dopt->if_exists = ropt->if_exists;
173 dopt->column_inserts = ropt->column_inserts;
174 dopt->dumpSections = ropt->dumpSections;
175 dopt->aclsSkip = ropt->aclsSkip;
176 dopt->outputSuperuser = ropt->superuser;
177 dopt->outputCreateDB = ropt->createDB;
178 dopt->outputNoOwner = ropt->noOwner;
179 dopt->outputNoTableAm = ropt->noTableAm;
180 dopt->outputNoTablespaces = ropt->noTablespace;
182 dopt->use_setsessauth = ropt->use_setsessauth;
184 dopt->dump_inserts = ropt->dump_inserts;
185 dopt->no_comments = ropt->no_comments;
186 dopt->no_publications = ropt->no_publications;
189 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
192 dopt->sequence_data = ropt->sequence_data;
193
194 return dopt;
195}
196
197
198/*
199 * Wrapper functions.
200 *
201 * The objective is to make writing new formats and dumpers as simple
202 * as possible, if necessary at the expense of extra function calls etc.
203 *
204 */
205
206/*
207 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
208 * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
209 * setup doesn't need to know anything much, so it's defined here.
210 */
211static void
213{
214 ArchiveHandle *AH = (ArchiveHandle *) AHX;
215
216 AH->ReopenPtr(AH);
217}
218
219
220/* Create a new archive */
221/* Public */
222Archive *
223CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
224 const pg_compress_specification compression_spec,
225 bool dosync, ArchiveMode mode,
228
229{
230 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
232
233 return (Archive *) AH;
234}
235
236/* Open an existing archive */
237/* Public */
238Archive *
239OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
240{
241 ArchiveHandle *AH;
242 pg_compress_specification compression_spec = {0};
243
244 compression_spec.algorithm = PG_COMPRESSION_NONE;
245 AH = _allocAH(FileSpec, fmt, compression_spec, true,
248
249 return (Archive *) AH;
250}
251
252/* Public */
253void
255{
256 ArchiveHandle *AH = (ArchiveHandle *) AHX;
257
258 AH->ClosePtr(AH);
259
260 /* Close the output */
261 errno = 0;
262 if (!EndCompressFileHandle(AH->OF))
263 pg_fatal("could not close output file: %m");
264}
265
266/* Public */
267void
269{
270 /* Caller can omit dump options, in which case we synthesize them */
271 if (dopt == NULL && ropt != NULL)
273
274 /* Save options for later access */
275 AH->dopt = dopt;
276 AH->ropt = ropt;
277}
278
279/* Public */
280void
282{
283 ArchiveHandle *AH = (ArchiveHandle *) AHX;
284 RestoreOptions *ropt = AH->public.ropt;
285 TocEntry *te;
286 teSection curSection;
287
288 /* Decide which TOC entries will be dumped/restored, and mark them */
289 curSection = SECTION_PRE_DATA;
290 for (te = AH->toc->next; te != AH->toc; te = te->next)
291 {
292 /*
293 * When writing an archive, we also take this opportunity to check
294 * that we have generated the entries in a sane order that respects
295 * the section divisions. When reading, don't complain, since buggy
296 * old versions of pg_dump might generate out-of-order archives.
297 */
298 if (AH->mode != archModeRead)
299 {
300 switch (te->section)
301 {
302 case SECTION_NONE:
303 /* ok to be anywhere */
304 break;
305 case SECTION_PRE_DATA:
306 if (curSection != SECTION_PRE_DATA)
307 pg_log_warning("archive items not in correct section order");
308 break;
309 case SECTION_DATA:
310 if (curSection == SECTION_POST_DATA)
311 pg_log_warning("archive items not in correct section order");
312 break;
314 /* ok no matter which section we were in */
315 break;
316 default:
317 pg_fatal("unexpected section code %d",
318 (int) te->section);
319 break;
320 }
321 }
322
323 if (te->section != SECTION_NONE)
324 curSection = te->section;
325
326 te->reqs = _tocEntryRequired(te, curSection, AH);
327 }
328
329 /* Enforce strict names checking */
330 if (ropt->strict_names)
331 StrictNamesCheck(ropt);
332}
333
334/* Public */
335void
337{
338 ArchiveHandle *AH = (ArchiveHandle *) AHX;
339 RestoreOptions *ropt = AH->public.ropt;
340 bool parallel_mode;
341 TocEntry *te;
343
345
346 /*
347 * If we're going to do parallel restore, there are some restrictions.
348 */
349 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
350 if (parallel_mode)
351 {
352 /* We haven't got round to making this work for all archive formats */
353 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
354 pg_fatal("parallel restore is not supported with this archive file format");
355
356 /* Doesn't work if the archive represents dependencies as OIDs */
357 if (AH->version < K_VERS_1_8)
358 pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
359
360 /*
361 * It's also not gonna work if we can't reopen the input file, so
362 * let's try that immediately.
363 */
364 AH->ReopenPtr(AH);
365 }
366
367 /*
368 * Make sure we won't need (de)compression we haven't got
369 */
370 if (AH->PrintTocDataPtr != NULL)
371 {
372 for (te = AH->toc->next; te != AH->toc; te = te->next)
373 {
374 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
375 {
377
378 if (errmsg)
379 pg_fatal("cannot restore from compressed archive (%s)",
380 errmsg);
381 else
382 break;
383 }
384 }
385 }
386
387 /*
388 * Prepare index arrays, so we can assume we have them throughout restore.
389 * It's possible we already did this, though.
390 */
391 if (AH->tocsByDumpId == NULL)
393
394 /*
395 * If we're using a DB connection, then connect it.
396 */
397 if (ropt->useDB)
398 {
399 pg_log_info("connecting to database for restore");
400 if (AH->version < K_VERS_1_3)
401 pg_fatal("direct database connections are not supported in pre-1.3 archives");
402
403 /*
404 * We don't want to guess at whether the dump will successfully
405 * restore; allow the attempt regardless of the version of the restore
406 * target.
407 */
408 AHX->minRemoteVersion = 0;
409 AHX->maxRemoteVersion = 9999999;
410
411 ConnectDatabase(AHX, &ropt->cparams, false);
412
413 /*
414 * If we're talking to the DB directly, don't send comments since they
415 * obscure SQL when displaying errors
416 */
417 AH->noTocComments = 1;
418 }
419
420 /*
421 * Work out if we have an implied data-only restore. This can happen if
422 * the dump was data only or if the user has used a toc list to exclude
423 * all of the schema data. All we do is look for schema entries - if none
424 * are found then we unset the dumpSchema flag.
425 *
426 * We could scan for wanted TABLE entries, but that is not the same as
427 * data-only. At this stage, it seems unnecessary (6-Mar-2001).
428 */
429 if (ropt->dumpSchema)
430 {
431 int impliedDataOnly = 1;
432
433 for (te = AH->toc->next; te != AH->toc; te = te->next)
434 {
435 if ((te->reqs & REQ_SCHEMA) != 0)
436 { /* It's schema, and it's wanted */
437 impliedDataOnly = 0;
438 break;
439 }
440 }
441 if (impliedDataOnly)
442 {
443 ropt->dumpSchema = false;
444 pg_log_info("implied data-only restore");
445 }
446 }
447
448 /*
449 * Setup the output file if necessary.
450 */
451 sav = SaveOutput(AH);
453 SetOutput(AH, ropt->filename, ropt->compression_spec);
454
455 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
456
457 if (AH->archiveRemoteVersion)
458 ahprintf(AH, "-- Dumped from database version %s\n",
460 if (AH->archiveDumpVersion)
461 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
463
464 ahprintf(AH, "\n");
465
466 if (AH->public.verbose)
467 dumpTimestamp(AH, "Started on", AH->createDate);
468
469 if (ropt->single_txn)
470 {
471 if (AH->connection)
472 StartTransaction(AHX);
473 else
474 ahprintf(AH, "BEGIN;\n\n");
475 }
476
477 /*
478 * Establish important parameter values right away.
479 */
481
483
484 /*
485 * Drop the items at the start, in reverse order
486 */
487 if (ropt->dropSchema)
488 {
489 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
490 {
491 AH->currentTE = te;
492
493 /*
494 * In createDB mode, issue a DROP *only* for the database as a
495 * whole. Issuing drops against anything else would be wrong,
496 * because at this point we're connected to the wrong database.
497 * (The DATABASE PROPERTIES entry, if any, should be treated like
498 * the DATABASE entry.)
499 */
500 if (ropt->createDB)
501 {
502 if (strcmp(te->desc, "DATABASE") != 0 &&
503 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
504 continue;
505 }
506
507 /* Otherwise, drop anything that's selected and has a dropStmt */
508 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
509 {
510 bool not_allowed_in_txn = false;
511
512 pg_log_info("dropping %s %s", te->desc, te->tag);
513
514 /*
515 * In --transaction-size mode, we have to temporarily exit our
516 * transaction block to drop objects that can't be dropped
517 * within a transaction.
518 */
519 if (ropt->txn_size > 0)
520 {
521 if (strcmp(te->desc, "DATABASE") == 0 ||
522 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
523 {
524 not_allowed_in_txn = true;
525 if (AH->connection)
527 else
528 ahprintf(AH, "COMMIT;\n");
529 }
530 }
531
532 /* Select owner and schema as necessary */
533 _becomeOwner(AH, te);
534 _selectOutputSchema(AH, te->namespace);
535
536 /*
537 * Now emit the DROP command, if the object has one. Note we
538 * don't necessarily emit it verbatim; at this point we add an
539 * appropriate IF EXISTS clause, if the user requested it.
540 */
541 if (strcmp(te->desc, "BLOB METADATA") == 0)
542 {
543 /* We must generate the per-blob commands */
544 if (ropt->if_exists)
545 IssueCommandPerBlob(AH, te,
546 "SELECT pg_catalog.lo_unlink(oid) "
547 "FROM pg_catalog.pg_largeobject_metadata "
548 "WHERE oid = '", "'");
549 else
550 IssueCommandPerBlob(AH, te,
551 "SELECT pg_catalog.lo_unlink('",
552 "')");
553 }
554 else if (*te->dropStmt != '\0')
555 {
556 if (!ropt->if_exists ||
557 strncmp(te->dropStmt, "--", 2) == 0)
558 {
559 /*
560 * Without --if-exists, or if it's just a comment (as
561 * happens for the public schema), print the dropStmt
562 * as-is.
563 */
564 ahprintf(AH, "%s", te->dropStmt);
565 }
566 else
567 {
568 /*
569 * Inject an appropriate spelling of "if exists". For
570 * old-style large objects, we have a routine that
571 * knows how to do it, without depending on
572 * te->dropStmt; use that. For other objects we need
573 * to parse the command.
574 */
575 if (strcmp(te->desc, "BLOB") == 0)
576 {
578 }
579 else
580 {
581 char *dropStmt = pg_strdup(te->dropStmt);
582 char *dropStmtOrig = dropStmt;
584
585 /*
586 * Need to inject IF EXISTS clause after ALTER
587 * TABLE part in ALTER TABLE .. DROP statement
588 */
589 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
590 {
592 "ALTER TABLE IF EXISTS");
593 dropStmt = dropStmt + 11;
594 }
595
596 /*
597 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
598 * not support the IF EXISTS clause, and therefore
599 * we simply emit the original command for DEFAULT
600 * objects (modulo the adjustment made above).
601 *
602 * Likewise, don't mess with DATABASE PROPERTIES.
603 *
604 * If we used CREATE OR REPLACE VIEW as a means of
605 * quasi-dropping an ON SELECT rule, that should
606 * be emitted unchanged as well.
607 *
608 * For other object types, we need to extract the
609 * first part of the DROP which includes the
610 * object type. Most of the time this matches
611 * te->desc, so search for that; however for the
612 * different kinds of CONSTRAINTs, we know to
613 * search for hardcoded "DROP CONSTRAINT" instead.
614 */
615 if (strcmp(te->desc, "DEFAULT") == 0 ||
616 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
617 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
618 appendPQExpBufferStr(ftStmt, dropStmt);
619 else
620 {
621 char buffer[40];
622 char *mark;
623
624 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
625 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
626 strcmp(te->desc, "FK CONSTRAINT") == 0)
627 strcpy(buffer, "DROP CONSTRAINT");
628 else
629 snprintf(buffer, sizeof(buffer), "DROP %s",
630 te->desc);
631
632 mark = strstr(dropStmt, buffer);
633
634 if (mark)
635 {
636 *mark = '\0';
637 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
638 dropStmt, buffer,
639 mark + strlen(buffer));
640 }
641 else
642 {
643 /* complain and emit unmodified command */
644 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
645 dropStmtOrig);
646 appendPQExpBufferStr(ftStmt, dropStmt);
647 }
648 }
649
650 ahprintf(AH, "%s", ftStmt->data);
651
652 destroyPQExpBuffer(ftStmt);
653 pg_free(dropStmtOrig);
654 }
655 }
656 }
657
658 /*
659 * In --transaction-size mode, re-establish the transaction
660 * block if needed; otherwise, commit after every N drops.
661 */
662 if (ropt->txn_size > 0)
663 {
664 if (not_allowed_in_txn)
665 {
666 if (AH->connection)
667 StartTransaction(AHX);
668 else
669 ahprintf(AH, "BEGIN;\n");
670 AH->txnCount = 0;
671 }
672 else if (++AH->txnCount >= ropt->txn_size)
673 {
674 if (AH->connection)
675 {
677 StartTransaction(AHX);
678 }
679 else
680 ahprintf(AH, "COMMIT;\nBEGIN;\n");
681 AH->txnCount = 0;
682 }
683 }
684 }
685 }
686
687 /*
688 * _selectOutputSchema may have set currSchema to reflect the effect
689 * of a "SET search_path" command it emitted. However, by now we may
690 * have dropped that schema; or it might not have existed in the first
691 * place. In either case the effective value of search_path will not
692 * be what we think. Forcibly reset currSchema so that we will
693 * re-establish the search_path setting when needed (after creating
694 * the schema).
695 *
696 * If we treated users as pg_dump'able objects then we'd need to reset
697 * currUser here too.
698 */
699 free(AH->currSchema);
700 AH->currSchema = NULL;
701 }
702
703 if (parallel_mode)
704 {
705 /*
706 * In parallel mode, turn control over to the parallel-restore logic.
707 */
708 ParallelState *pstate;
709 TocEntry pending_list;
710
711 /* The archive format module may need some setup for this */
714
715 pending_list_header_init(&pending_list);
716
717 /* This runs PRE_DATA items and then disconnects from the database */
718 restore_toc_entries_prefork(AH, &pending_list);
719 Assert(AH->connection == NULL);
720
721 /* ParallelBackupStart() will actually fork the processes */
722 pstate = ParallelBackupStart(AH);
723 restore_toc_entries_parallel(AH, pstate, &pending_list);
724 ParallelBackupEnd(AH, pstate);
725
726 /* reconnect the leader and see if we missed something */
727 restore_toc_entries_postfork(AH, &pending_list);
728 Assert(AH->connection != NULL);
729 }
730 else
731 {
732 /*
733 * In serial mode, process everything in three phases: normal items,
734 * then ACLs, then post-ACL items. We might be able to skip one or
735 * both extra phases in some cases, eg data-only restores.
736 */
737 bool haveACL = false;
738 bool havePostACL = false;
739
740 for (te = AH->toc->next; te != AH->toc; te = te->next)
741 {
742 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
743 continue; /* ignore if not to be dumped at all */
744
745 switch (_tocEntryRestorePass(te))
746 {
748 (void) restore_toc_entry(AH, te, false);
749 break;
750 case RESTORE_PASS_ACL:
751 haveACL = true;
752 break;
754 havePostACL = true;
755 break;
756 }
757 }
758
759 if (haveACL)
760 {
761 for (te = AH->toc->next; te != AH->toc; te = te->next)
762 {
763 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
765 (void) restore_toc_entry(AH, te, false);
766 }
767 }
768
769 if (havePostACL)
770 {
771 for (te = AH->toc->next; te != AH->toc; te = te->next)
772 {
773 if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
775 (void) restore_toc_entry(AH, te, false);
776 }
777 }
778 }
779
780 /*
781 * Close out any persistent transaction we may have. While these two
782 * cases are started in different places, we can end both cases here.
783 */
784 if (ropt->single_txn || ropt->txn_size > 0)
785 {
786 if (AH->connection)
788 else
789 ahprintf(AH, "COMMIT;\n\n");
790 }
791
792 if (AH->public.verbose)
793 dumpTimestamp(AH, "Completed on", time(NULL));
794
795 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
796
797 /*
798 * Clean up & we're done.
799 */
801
803 RestoreOutput(AH, sav);
804
805 if (ropt->useDB)
807}
808
809/*
810 * Restore a single TOC item. Used in both parallel and non-parallel restore;
811 * is_parallel is true if we are in a worker child process.
812 *
813 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
814 * the parallel parent has to make the corresponding status update.
815 */
816static int
817restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
818{
819 RestoreOptions *ropt = AH->public.ropt;
820 int status = WORKER_OK;
821 int reqs;
822 bool defnDumped;
823
824 AH->currentTE = te;
825
826 /* Dump any relevant dump warnings to stderr */
827 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
828 {
829 if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
830 pg_log_warning("warning from original dump file: %s", te->defn);
831 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
832 pg_log_warning("warning from original dump file: %s", te->copyStmt);
833 }
834
835 /* Work out what, if anything, we want from this entry */
836 reqs = te->reqs;
837
838 defnDumped = false;
839
840 /*
841 * If it has a schema component that we want, then process that
842 */
843 if ((reqs & REQ_SCHEMA) != 0)
844 {
845 bool object_is_db = false;
846
847 /*
848 * In --transaction-size mode, must exit our transaction block to
849 * create a database or set its properties.
850 */
851 if (strcmp(te->desc, "DATABASE") == 0 ||
852 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
853 {
854 object_is_db = true;
855 if (ropt->txn_size > 0)
856 {
857 if (AH->connection)
859 else
860 ahprintf(AH, "COMMIT;\n\n");
861 }
862 }
863
864 /* Show namespace in log message if available */
865 if (te->namespace)
866 pg_log_info("creating %s \"%s.%s\"",
867 te->desc, te->namespace, te->tag);
868 else
869 pg_log_info("creating %s \"%s\"",
870 te->desc, te->tag);
871
872 _printTocEntry(AH, te, false);
873 defnDumped = true;
874
875 if (strcmp(te->desc, "TABLE") == 0)
876 {
877 if (AH->lastErrorTE == te)
878 {
879 /*
880 * We failed to create the table. If
881 * --no-data-for-failed-tables was given, mark the
882 * corresponding TABLE DATA to be ignored.
883 *
884 * In the parallel case this must be done in the parent, so we
885 * just set the return value.
886 */
887 if (ropt->noDataForFailedTables)
888 {
889 if (is_parallel)
890 status = WORKER_INHIBIT_DATA;
891 else
893 }
894 }
895 else
896 {
897 /*
898 * We created the table successfully. Mark the corresponding
899 * TABLE DATA for possible truncation.
900 *
901 * In the parallel case this must be done in the parent, so we
902 * just set the return value.
903 */
904 if (is_parallel)
905 status = WORKER_CREATE_DONE;
906 else
907 mark_create_done(AH, te);
908 }
909 }
910
911 /*
912 * If we created a DB, connect to it. Also, if we changed DB
913 * properties, reconnect to ensure that relevant GUC settings are
914 * applied to our session. (That also restarts the transaction block
915 * in --transaction-size mode.)
916 */
917 if (object_is_db)
918 {
919 pg_log_info("connecting to new database \"%s\"", te->tag);
920 _reconnectToDB(AH, te->tag);
921 }
922 }
923
924 /*
925 * If it has a data component that we want, then process that
926 */
927 if ((reqs & REQ_DATA) != 0)
928 {
929 /*
930 * hadDumper will be set if there is genuine data component for this
931 * node. Otherwise, we need to check the defn field for statements
932 * that need to be executed in data-only restores.
933 */
934 if (te->hadDumper)
935 {
936 /*
937 * If we can output the data, then restore it.
938 */
939 if (AH->PrintTocDataPtr != NULL)
940 {
941 _printTocEntry(AH, te, true);
942
943 if (strcmp(te->desc, "BLOBS") == 0 ||
944 strcmp(te->desc, "BLOB COMMENTS") == 0)
945 {
946 pg_log_info("processing %s", te->desc);
947
948 _selectOutputSchema(AH, "pg_catalog");
949
950 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
951 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
953
954 AH->PrintTocDataPtr(AH, te);
955
957 }
958 else
959 {
960 bool use_truncate;
961
963
964 /* Select owner and schema as necessary */
965 _becomeOwner(AH, te);
966 _selectOutputSchema(AH, te->namespace);
967
968 pg_log_info("processing data for table \"%s.%s\"",
969 te->namespace, te->tag);
970
971 /*
972 * In parallel restore, if we created the table earlier in
973 * this run (so that we know it is empty) and we are not
974 * restoring a load-via-partition-root data item then we
975 * wrap the COPY in a transaction and precede it with a
976 * TRUNCATE. If wal_level is set to minimal this prevents
977 * WAL-logging the COPY. This obtains a speedup similar
978 * to that from using single_txn mode in non-parallel
979 * restores.
980 *
981 * We mustn't do this for load-via-partition-root cases
982 * because some data might get moved across partition
983 * boundaries, risking deadlock and/or loss of previously
984 * loaded data. (We assume that all partitions of a
985 * partitioned table will be treated the same way.)
986 */
987 use_truncate = is_parallel && te->created &&
989
990 if (use_truncate)
991 {
992 /*
993 * Parallel restore is always talking directly to a
994 * server, so no need to see if we should issue BEGIN.
995 */
997
998 /*
999 * Issue TRUNCATE with ONLY so that child tables are
1000 * not wiped.
1001 */
1002 ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1003 fmtQualifiedId(te->namespace, te->tag));
1004 }
1005
1006 /*
1007 * If we have a copy statement, use it.
1008 */
1009 if (te->copyStmt && strlen(te->copyStmt) > 0)
1010 {
1011 ahprintf(AH, "%s", te->copyStmt);
1013 }
1014 else
1016
1017 AH->PrintTocDataPtr(AH, te);
1018
1019 /*
1020 * Terminate COPY if needed.
1021 */
1022 if (AH->outputKind == OUTPUT_COPYDATA &&
1023 RestoringToDB(AH))
1024 EndDBCopyMode(&AH->public, te->tag);
1026
1027 /* close out the transaction started above */
1028 if (use_truncate)
1030
1032 }
1033 }
1034 }
1035 else if (!defnDumped)
1036 {
1037 /* If we haven't already dumped the defn part, do so now */
1038 pg_log_info("executing %s %s", te->desc, te->tag);
1039 _printTocEntry(AH, te, false);
1040 }
1041 }
1042
1043 /*
1044 * If we emitted anything for this TOC entry, that counts as one action
1045 * against the transaction-size limit. Commit if it's time to.
1046 */
1047 if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
1048 {
1049 if (++AH->txnCount >= ropt->txn_size)
1050 {
1051 if (AH->connection)
1052 {
1055 }
1056 else
1057 ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1058 AH->txnCount = 0;
1059 }
1060 }
1061
1062 if (AH->public.n_errors > 0 && status == WORKER_OK)
1063 status = WORKER_IGNORED_ERRORS;
1064
1065 return status;
1066}
1067
1068/*
1069 * Allocate a new RestoreOptions block.
1070 * This is mainly so we can initialize it, but also for future expansion,
1071 */
1074{
1076
1078
1079 /* set any fields that shouldn't default to zeroes */
1080 opts->format = archUnknown;
1081 opts->cparams.promptPassword = TRI_DEFAULT;
1082 opts->dumpSections = DUMP_UNSECTIONED;
1083 opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1084 opts->compression_spec.level = 0;
1085 opts->dumpSchema = true;
1086 opts->dumpData = true;
1087
1088 return opts;
1089}
1090
1091static void
1093{
1094 RestoreOptions *ropt = AH->public.ropt;
1095
1096 /* This hack is only needed in a data-only restore */
1097 if (ropt->dumpSchema || !ropt->disable_triggers)
1098 return;
1099
1100 pg_log_info("disabling triggers for %s", te->tag);
1101
1102 /*
1103 * Become superuser if possible, since they are the only ones who can
1104 * disable constraint triggers. If -S was not given, assume the initial
1105 * user identity is a superuser. (XXX would it be better to become the
1106 * table owner?)
1107 */
1108 _becomeUser(AH, ropt->superuser);
1109
1110 /*
1111 * Disable them.
1112 */
1113 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1114 fmtQualifiedId(te->namespace, te->tag));
1115}
1116
1117static void
1119{
1120 RestoreOptions *ropt = AH->public.ropt;
1121
1122 /* This hack is only needed in a data-only restore */
1123 if (ropt->dumpSchema || !ropt->disable_triggers)
1124 return;
1125
1126 pg_log_info("enabling triggers for %s", te->tag);
1127
1128 /*
1129 * Become superuser if possible, since they are the only ones who can
1130 * disable constraint triggers. If -S was not given, assume the initial
1131 * user identity is a superuser. (XXX would it be better to become the
1132 * table owner?)
1133 */
1134 _becomeUser(AH, ropt->superuser);
1135
1136 /*
1137 * Enable them.
1138 */
1139 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1140 fmtQualifiedId(te->namespace, te->tag));
1141}
1142
1143/*
1144 * Detect whether a TABLE DATA TOC item is performing "load via partition
1145 * root", that is the target table is an ancestor partition rather than the
1146 * table the TOC item is nominally for.
1147 *
1148 * In newer archive files this can be detected by checking for a special
1149 * comment placed in te->defn. In older files we have to fall back to seeing
1150 * if the COPY statement targets the named table or some other one. This
1151 * will not work for data dumped as INSERT commands, so we could give a false
1152 * negative in that case; fortunately, that's a rarely-used option.
1153 */
1154static bool
1156{
1157 if (te->defn &&
1158 strncmp(te->defn, "-- load via partition root ", 27) == 0)
1159 return true;
1160 if (te->copyStmt && *te->copyStmt)
1161 {
1162 PQExpBuffer copyStmt = createPQExpBuffer();
1163 bool result;
1164
1165 /*
1166 * Build the initial part of the COPY as it would appear if the
1167 * nominal target table is the actual target. If we see anything
1168 * else, it must be a load-via-partition-root case.
1169 */
1170 appendPQExpBuffer(copyStmt, "COPY %s ",
1171 fmtQualifiedId(te->namespace, te->tag));
1172 result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1173 destroyPQExpBuffer(copyStmt);
1174 return result;
1175 }
1176 /* Assume it's not load-via-partition-root */
1177 return false;
1178}
1179
1180/*
1181 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1182 */
1183
1184/* Public */
1185void
1186WriteData(Archive *AHX, const void *data, size_t dLen)
1187{
1188 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1189
1190 if (!AH->currToc)
1191 pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1192
1193 AH->WriteDataPtr(AH, data, dLen);
1194}
1195
1196/*
1197 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1198 * repository for all metadata. But the name has stuck.
1199 *
1200 * The new entry is added to the Archive's TOC list. Most callers can ignore
1201 * the result value because nothing else need be done, but a few want to
1202 * manipulate the TOC entry further.
1203 */
1204
1205/* Public */
1206TocEntry *
1207ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1209{
1210 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1211 TocEntry *newToc;
1212
1213 newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1214
1215 AH->tocCount++;
1216 if (dumpId > AH->maxDumpId)
1217 AH->maxDumpId = dumpId;
1218
1219 newToc->prev = AH->toc->prev;
1220 newToc->next = AH->toc;
1221 AH->toc->prev->next = newToc;
1222 AH->toc->prev = newToc;
1223
1224 newToc->catalogId = catalogId;
1225 newToc->dumpId = dumpId;
1226 newToc->section = opts->section;
1227
1228 newToc->tag = pg_strdup(opts->tag);
1229 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1230 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1231 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1232 newToc->relkind = opts->relkind;
1233 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1234 newToc->desc = pg_strdup(opts->description);
1235 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1236 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1237 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1238
1239 if (opts->nDeps > 0)
1240 {
1241 newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1242 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1243 newToc->nDeps = opts->nDeps;
1244 }
1245 else
1246 {
1247 newToc->dependencies = NULL;
1248 newToc->nDeps = 0;
1249 }
1250
1251 newToc->dataDumper = opts->dumpFn;
1252 newToc->dataDumperArg = opts->dumpArg;
1253 newToc->hadDumper = opts->dumpFn ? true : false;
1254
1255 newToc->formatData = NULL;
1256 newToc->dataLength = 0;
1257
1258 if (AH->ArchiveEntryPtr != NULL)
1259 AH->ArchiveEntryPtr(AH, newToc);
1260
1261 return newToc;
1262}
1263
1264/* Public */
1265void
1267{
1268 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1269 RestoreOptions *ropt = AH->public.ropt;
1270 TocEntry *te;
1271 pg_compress_specification out_compression_spec = {0};
1272 teSection curSection;
1273 CompressFileHandle *sav;
1274 const char *fmtName;
1275 char stamp_str[64];
1276
1277 /* TOC is always uncompressed */
1278 out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1279
1280 sav = SaveOutput(AH);
1281 if (ropt->filename)
1282 SetOutput(AH, ropt->filename, out_compression_spec);
1283
1284 if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1285 localtime(&AH->createDate)) == 0)
1286 strcpy(stamp_str, "[unknown]");
1287
1288 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1289 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1290 sanitize_line(AH->archdbname, false),
1291 AH->tocCount,
1293
1294 switch (AH->format)
1295 {
1296 case archCustom:
1297 fmtName = "CUSTOM";
1298 break;
1299 case archDirectory:
1300 fmtName = "DIRECTORY";
1301 break;
1302 case archTar:
1303 fmtName = "TAR";
1304 break;
1305 default:
1306 fmtName = "UNKNOWN";
1307 }
1308
1309 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1311 ahprintf(AH, "; Format: %s\n", fmtName);
1312 ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1313 ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1314 if (AH->archiveRemoteVersion)
1315 ahprintf(AH, "; Dumped from database version: %s\n",
1317 if (AH->archiveDumpVersion)
1318 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1319 AH->archiveDumpVersion);
1320
1321 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1322
1323 curSection = SECTION_PRE_DATA;
1324 for (te = AH->toc->next; te != AH->toc; te = te->next)
1325 {
1326 /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1327 if (te->section != SECTION_NONE)
1328 curSection = te->section;
1329 te->reqs = _tocEntryRequired(te, curSection, AH);
1330 /* Now, should we print it? */
1331 if (ropt->verbose ||
1332 (te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
1333 {
1334 char *sanitized_name;
1335 char *sanitized_schema;
1336 char *sanitized_owner;
1337
1338 /*
1339 */
1340 sanitized_name = sanitize_line(te->tag, false);
1341 sanitized_schema = sanitize_line(te->namespace, true);
1342 sanitized_owner = sanitize_line(te->owner, false);
1343
1344 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1346 te->desc, sanitized_schema, sanitized_name,
1347 sanitized_owner);
1348
1349 free(sanitized_name);
1350 free(sanitized_schema);
1351 free(sanitized_owner);
1352 }
1353 if (ropt->verbose && te->nDeps > 0)
1354 {
1355 int i;
1356
1357 ahprintf(AH, ";\tdepends on:");
1358 for (i = 0; i < te->nDeps; i++)
1359 ahprintf(AH, " %d", te->dependencies[i]);
1360 ahprintf(AH, "\n");
1361 }
1362 }
1363
1364 /* Enforce strict names checking */
1365 if (ropt->strict_names)
1366 StrictNamesCheck(ropt);
1367
1368 if (ropt->filename)
1369 RestoreOutput(AH, sav);
1370}
1371
1372/***********
1373 * Large Object Archival
1374 ***********/
1375
1376/* Called by a dumper to signal start of a LO */
1377int
1379{
1380 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1381
1382 if (!AH->StartLOPtr)
1383 pg_fatal("large-object output not supported in chosen format");
1384
1385 AH->StartLOPtr(AH, AH->currToc, oid);
1386
1387 return 1;
1388}
1389
1390/* Called by a dumper to signal end of a LO */
1391int
1393{
1394 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1395
1396 if (AH->EndLOPtr)
1397 AH->EndLOPtr(AH, AH->currToc, oid);
1398
1399 return 1;
1400}
1401
1402/**********
1403 * Large Object Restoration
1404 **********/
1405
1406/*
1407 * Called by a format handler before a group of LOs is restored
1408 */
1409void
1411{
1412 RestoreOptions *ropt = AH->public.ropt;
1413
1414 /*
1415 * LOs must be restored within a transaction block, since we need the LO
1416 * handle to stay open while we write it. Establish a transaction unless
1417 * there's one being used globally.
1418 */
1419 if (!(ropt->single_txn || ropt->txn_size > 0))
1420 {
1421 if (AH->connection)
1423 else
1424 ahprintf(AH, "BEGIN;\n\n");
1425 }
1426
1427 AH->loCount = 0;
1428}
1429
1430/*
1431 * Called by a format handler after a group of LOs is restored
1432 */
1433void
1435{
1436 RestoreOptions *ropt = AH->public.ropt;
1437
1438 if (!(ropt->single_txn || ropt->txn_size > 0))
1439 {
1440 if (AH->connection)
1442 else
1443 ahprintf(AH, "COMMIT;\n\n");
1444 }
1445
1446 pg_log_info(ngettext("restored %d large object",
1447 "restored %d large objects",
1448 AH->loCount),
1449 AH->loCount);
1450}
1451
1452
1453/*
1454 * Called by a format handler to initiate restoration of a LO
1455 */
1456void
1458{
1459 bool old_lo_style = (AH->version < K_VERS_1_12);
1460 Oid loOid;
1461
1462 AH->loCount++;
1463
1464 /* Initialize the LO Buffer */
1465 if (AH->lo_buf == NULL)
1466 {
1467 /* First time through (in this process) so allocate the buffer */
1468 AH->lo_buf_size = LOBBUFSIZE;
1470 }
1471 AH->lo_buf_used = 0;
1472
1473 pg_log_info("restoring large object with OID %u", oid);
1474
1475 /* With an old archive we must do drop and create logic here */
1476 if (old_lo_style && drop)
1477 DropLOIfExists(AH, oid);
1478
1479 if (AH->connection)
1480 {
1481 if (old_lo_style)
1482 {
1483 loOid = lo_create(AH->connection, oid);
1484 if (loOid == 0 || loOid != oid)
1485 pg_fatal("could not create large object %u: %s",
1486 oid, PQerrorMessage(AH->connection));
1487 }
1488 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1489 if (AH->loFd == -1)
1490 pg_fatal("could not open large object %u: %s",
1491 oid, PQerrorMessage(AH->connection));
1492 }
1493 else
1494 {
1495 if (old_lo_style)
1496 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1497 oid, INV_WRITE);
1498 else
1499 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1500 oid, INV_WRITE);
1501 }
1502
1503 AH->writingLO = true;
1504}
1505
1506void
1508{
1509 if (AH->lo_buf_used > 0)
1510 {
1511 /* Write remaining bytes from the LO buffer */
1512 dump_lo_buf(AH);
1513 }
1514
1515 AH->writingLO = false;
1516
1517 if (AH->connection)
1518 {
1519 lo_close(AH->connection, AH->loFd);
1520 AH->loFd = -1;
1521 }
1522 else
1523 {
1524 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1525 }
1526}
1527
1528/***********
1529 * Sorting and Reordering
1530 ***********/
1531
1532void
1534{
1535 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1536 RestoreOptions *ropt = AH->public.ropt;
1537 FILE *fh;
1538 StringInfoData linebuf;
1539
1540 /* Allocate space for the 'wanted' array, and init it */
1541 ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1542
1543 /* Setup the file */
1544 fh = fopen(ropt->tocFile, PG_BINARY_R);
1545 if (!fh)
1546 pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1547
1548 initStringInfo(&linebuf);
1549
1550 while (pg_get_line_buf(fh, &linebuf))
1551 {
1552 char *cmnt;
1553 char *endptr;
1554 DumpId id;
1555 TocEntry *te;
1556
1557 /* Truncate line at comment, if any */
1558 cmnt = strchr(linebuf.data, ';');
1559 if (cmnt != NULL)
1560 {
1561 cmnt[0] = '\0';
1562 linebuf.len = cmnt - linebuf.data;
1563 }
1564
1565 /* Ignore if all blank */
1566 if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1567 continue;
1568
1569 /* Get an ID, check it's valid and not already seen */
1570 id = strtol(linebuf.data, &endptr, 10);
1571 if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1572 ropt->idWanted[id - 1])
1573 {
1574 pg_log_warning("line ignored: %s", linebuf.data);
1575 continue;
1576 }
1577
1578 /* Find TOC entry */
1579 te = getTocEntryByDumpId(AH, id);
1580 if (!te)
1581 pg_fatal("could not find entry for ID %d",
1582 id);
1583
1584 /* Mark it wanted */
1585 ropt->idWanted[id - 1] = true;
1586
1587 /*
1588 * Move each item to the end of the list as it is selected, so that
1589 * they are placed in the desired order. Any unwanted items will end
1590 * up at the front of the list, which may seem unintuitive but it's
1591 * what we need. In an ordinary serial restore that makes no
1592 * difference, but in a parallel restore we need to mark unrestored
1593 * items' dependencies as satisfied before we start examining
1594 * restorable items. Otherwise they could have surprising
1595 * side-effects on the order in which restorable items actually get
1596 * restored.
1597 */
1598 _moveBefore(AH->toc, te);
1599 }
1600
1601 pg_free(linebuf.data);
1602
1603 if (fclose(fh) != 0)
1604 pg_fatal("could not close TOC file: %m");
1605}
1606
1607/**********************
1608 * Convenience functions that look like standard IO functions
1609 * for writing data when in dump mode.
1610 **********************/
1611
1612/* Public */
1613void
1614archputs(const char *s, Archive *AH)
1615{
1616 WriteData(AH, s, strlen(s));
1617}
1618
1619/* Public */
1620int
1621archprintf(Archive *AH, const char *fmt,...)
1622{
1623 int save_errno = errno;
1624 char *p;
1625 size_t len = 128; /* initial assumption about buffer size */
1626 size_t cnt;
1627
1628 for (;;)
1629 {
1630 va_list args;
1631
1632 /* Allocate work buffer. */
1633 p = (char *) pg_malloc(len);
1634
1635 /* Try to format the data. */
1636 errno = save_errno;
1637 va_start(args, fmt);
1638 cnt = pvsnprintf(p, len, fmt, args);
1639 va_end(args);
1640
1641 if (cnt < len)
1642 break; /* success */
1643
1644 /* Release buffer and loop around to try again with larger len. */
1645 free(p);
1646 len = cnt;
1647 }
1648
1649 WriteData(AH, p, cnt);
1650 free(p);
1651 return (int) cnt;
1652}
1653
1654
1655/*******************************
1656 * Stuff below here should be 'private' to the archiver routines
1657 *******************************/
1658
1659static void
1661 const pg_compress_specification compression_spec)
1662{
1663 CompressFileHandle *CFH;
1664 const char *mode;
1665 int fn = -1;
1666
1667 if (filename)
1668 {
1669 if (strcmp(filename, "-") == 0)
1670 fn = fileno(stdout);
1671 }
1672 else if (AH->FH)
1673 fn = fileno(AH->FH);
1674 else if (AH->fSpec)
1675 {
1676 filename = AH->fSpec;
1677 }
1678 else
1679 fn = fileno(stdout);
1680
1681 if (AH->mode == archModeAppend)
1682 mode = PG_BINARY_A;
1683 else
1684 mode = PG_BINARY_W;
1685
1686 CFH = InitCompressFileHandle(compression_spec);
1687
1688 if (!CFH->open_func(filename, fn, mode, CFH))
1689 {
1690 if (filename)
1691 pg_fatal("could not open output file \"%s\": %m", filename);
1692 else
1693 pg_fatal("could not open output file: %m");
1694 }
1695
1696 AH->OF = CFH;
1697}
1698
1699static CompressFileHandle *
1701{
1702 return (CompressFileHandle *) AH->OF;
1703}
1704
1705static void
1707{
1708 errno = 0;
1709 if (!EndCompressFileHandle(AH->OF))
1710 pg_fatal("could not close output file: %m");
1711
1712 AH->OF = savedOutput;
1713}
1714
1715
1716
1717/*
1718 * Print formatted text to the output file (usually stdout).
1719 */
1720int
1721ahprintf(ArchiveHandle *AH, const char *fmt,...)
1722{
1723 int save_errno = errno;
1724 char *p;
1725 size_t len = 128; /* initial assumption about buffer size */
1726 size_t cnt;
1727
1728 for (;;)
1729 {
1730 va_list args;
1731
1732 /* Allocate work buffer. */
1733 p = (char *) pg_malloc(len);
1734
1735 /* Try to format the data. */
1736 errno = save_errno;
1737 va_start(args, fmt);
1738 cnt = pvsnprintf(p, len, fmt, args);
1739 va_end(args);
1740
1741 if (cnt < len)
1742 break; /* success */
1743
1744 /* Release buffer and loop around to try again with larger len. */
1745 free(p);
1746 len = cnt;
1747 }
1748
1749 ahwrite(p, 1, cnt, AH);
1750 free(p);
1751 return (int) cnt;
1752}
1753
1754/*
1755 * Single place for logic which says 'We are restoring to a direct DB connection'.
1756 */
1757static int
1759{
1760 RestoreOptions *ropt = AH->public.ropt;
1761
1762 return (ropt && ropt->useDB && AH->connection);
1763}
1764
1765/*
1766 * Dump the current contents of the LO data buffer while writing a LO
1767 */
1768static void
1770{
1771 if (AH->connection)
1772 {
1773 int res;
1774
1775 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1776 pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1777 "wrote %zu bytes of large object data (result = %d)",
1778 AH->lo_buf_used),
1779 AH->lo_buf_used, res);
1780 /* We assume there are no short writes, only errors */
1781 if (res != AH->lo_buf_used)
1782 warn_or_exit_horribly(AH, "could not write to large object: %s",
1784 }
1785 else
1786 {
1788
1790 (const unsigned char *) AH->lo_buf,
1791 AH->lo_buf_used,
1792 AH);
1793
1794 /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1795 AH->writingLO = false;
1796 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1797 AH->writingLO = true;
1798
1800 }
1801 AH->lo_buf_used = 0;
1802}
1803
1804
1805/*
1806 * Write buffer to the output file (usually stdout). This is used for
1807 * outputting 'restore' scripts etc. It is even possible for an archive
1808 * format to create a custom output routine to 'fake' a restore if it
1809 * wants to generate a script (see TAR output).
1810 */
1811void
1812ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1813{
1814 int bytes_written = 0;
1815
1816 if (AH->writingLO)
1817 {
1818 size_t remaining = size * nmemb;
1819
1820 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1821 {
1822 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1823
1824 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1825 ptr = (const char *) ptr + avail;
1826 remaining -= avail;
1827 AH->lo_buf_used += avail;
1828 dump_lo_buf(AH);
1829 }
1830
1831 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1832 AH->lo_buf_used += remaining;
1833
1834 bytes_written = size * nmemb;
1835 }
1836 else if (AH->CustomOutPtr)
1837 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1838
1839 /*
1840 * If we're doing a restore, and it's direct to DB, and we're connected
1841 * then send it to the DB.
1842 */
1843 else if (RestoringToDB(AH))
1844 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1845 else
1846 {
1848
1849 if (CFH->write_func(ptr, size * nmemb, CFH))
1850 bytes_written = size * nmemb;
1851 }
1852
1853 if (bytes_written != size * nmemb)
1855}
1856
1857/* on some error, we may decide to go on... */
1858void
1860{
1861 va_list ap;
1862
1863 switch (AH->stage)
1864 {
1865
1866 case STAGE_NONE:
1867 /* Do nothing special */
1868 break;
1869
1870 case STAGE_INITIALIZING:
1871 if (AH->stage != AH->lastErrorStage)
1872 pg_log_info("while INITIALIZING:");
1873 break;
1874
1875 case STAGE_PROCESSING:
1876 if (AH->stage != AH->lastErrorStage)
1877 pg_log_info("while PROCESSING TOC:");
1878 break;
1879
1880 case STAGE_FINALIZING:
1881 if (AH->stage != AH->lastErrorStage)
1882 pg_log_info("while FINALIZING:");
1883 break;
1884 }
1885 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1886 {
1887 pg_log_info("from TOC entry %d; %u %u %s %s %s",
1888 AH->currentTE->dumpId,
1890 AH->currentTE->catalogId.oid,
1891 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1892 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1893 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1894 }
1895 AH->lastErrorStage = AH->stage;
1896 AH->lastErrorTE = AH->currentTE;
1897
1898 va_start(ap, fmt);
1900 va_end(ap);
1901
1902 if (AH->public.exit_on_error)
1903 exit_nicely(1);
1904 else
1905 AH->public.n_errors++;
1906}
1907
1908#ifdef NOT_USED
1909
1910static void
1911_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1912{
1913 /* Unlink te from list */
1914 te->prev->next = te->next;
1915 te->next->prev = te->prev;
1916
1917 /* and insert it after "pos" */
1918 te->prev = pos;
1919 te->next = pos->next;
1920 pos->next->prev = te;
1921 pos->next = te;
1922}
1923#endif
1924
1925static void
1927{
1928 /* Unlink te from list */
1929 te->prev->next = te->next;
1930 te->next->prev = te->prev;
1931
1932 /* and insert it before "pos" */
1933 te->prev = pos->prev;
1934 te->next = pos;
1935 pos->prev->next = te;
1936 pos->prev = te;
1937}
1938
1939/*
1940 * Build index arrays for the TOC list
1941 *
1942 * This should be invoked only after we have created or read in all the TOC
1943 * items.
1944 *
1945 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1946 * array entries run only up to maxDumpId. We might see dependency dump IDs
1947 * beyond that (if the dump was partial); so always check the array bound
1948 * before trying to touch an array entry.
1949 */
1950static void
1952{
1953 DumpId maxDumpId = AH->maxDumpId;
1954 TocEntry *te;
1955
1956 AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1957 AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1958
1959 for (te = AH->toc->next; te != AH->toc; te = te->next)
1960 {
1961 /* this check is purely paranoia, maxDumpId should be correct */
1962 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1963 pg_fatal("bad dumpId");
1964
1965 /* tocsByDumpId indexes all TOCs by their dump ID */
1966 AH->tocsByDumpId[te->dumpId] = te;
1967
1968 /*
1969 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1970 * TOC entry that has a DATA item. We compute this by reversing the
1971 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1972 * just one dependency and it is the TABLE item.
1973 */
1974 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1975 {
1976 DumpId tableId = te->dependencies[0];
1977
1978 /*
1979 * The TABLE item might not have been in the archive, if this was
1980 * a data-only dump; but its dump ID should be less than its data
1981 * item's dump ID, so there should be a place for it in the array.
1982 */
1983 if (tableId <= 0 || tableId > maxDumpId)
1984 pg_fatal("bad table dumpId for TABLE DATA item");
1985
1986 AH->tableDataId[tableId] = te->dumpId;
1987 }
1988 }
1989}
1990
1991TocEntry *
1993{
1994 /* build index arrays if we didn't already */
1995 if (AH->tocsByDumpId == NULL)
1997
1998 if (id > 0 && id <= AH->maxDumpId)
1999 return AH->tocsByDumpId[id];
2000
2001 return NULL;
2002}
2003
2004int
2006{
2007 TocEntry *te = getTocEntryByDumpId(AH, id);
2008
2009 if (!te)
2010 return 0;
2011
2012 return te->reqs;
2013}
2014
2015size_t
2017{
2018 int off;
2019
2020 /* Save the flag */
2021 AH->WriteBytePtr(AH, wasSet);
2022
2023 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2024 for (off = 0; off < sizeof(pgoff_t); off++)
2025 {
2026 AH->WriteBytePtr(AH, o & 0xFF);
2027 o >>= 8;
2028 }
2029 return sizeof(pgoff_t) + 1;
2030}
2031
2032int
2034{
2035 int i;
2036 int off;
2037 int offsetFlg;
2038
2039 /* Initialize to zero */
2040 *o = 0;
2041
2042 /* Check for old version */
2043 if (AH->version < K_VERS_1_7)
2044 {
2045 /* Prior versions wrote offsets using WriteInt */
2046 i = ReadInt(AH);
2047 /* -1 means not set */
2048 if (i < 0)
2049 return K_OFFSET_POS_NOT_SET;
2050 else if (i == 0)
2051 return K_OFFSET_NO_DATA;
2052
2053 /* Cast to pgoff_t because it was written as an int. */
2054 *o = (pgoff_t) i;
2055 return K_OFFSET_POS_SET;
2056 }
2057
2058 /*
2059 * Read the flag indicating the state of the data pointer. Check if valid
2060 * and die if not.
2061 *
2062 * This used to be handled by a negative or zero pointer, now we use an
2063 * extra byte specifically for the state.
2064 */
2065 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2066
2067 switch (offsetFlg)
2068 {
2070 case K_OFFSET_NO_DATA:
2071 case K_OFFSET_POS_SET:
2072
2073 break;
2074
2075 default:
2076 pg_fatal("unexpected data offset flag %d", offsetFlg);
2077 }
2078
2079 /*
2080 * Read the bytes
2081 */
2082 for (off = 0; off < AH->offSize; off++)
2083 {
2084 if (off < sizeof(pgoff_t))
2085 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2086 else
2087 {
2088 if (AH->ReadBytePtr(AH) != 0)
2089 pg_fatal("file offset in dump file is too large");
2090 }
2091 }
2092
2093 return offsetFlg;
2094}
2095
2096size_t
2098{
2099 int b;
2100
2101 /*
2102 * This is a bit yucky, but I don't want to make the binary format very
2103 * dependent on representation, and not knowing much about it, I write out
2104 * a sign byte. If you change this, don't forget to change the file
2105 * version #, and modify ReadInt to read the new format AS WELL AS the old
2106 * formats.
2107 */
2108
2109 /* SIGN byte */
2110 if (i < 0)
2111 {
2112 AH->WriteBytePtr(AH, 1);
2113 i = -i;
2114 }
2115 else
2116 AH->WriteBytePtr(AH, 0);
2117
2118 for (b = 0; b < AH->intSize; b++)
2119 {
2120 AH->WriteBytePtr(AH, i & 0xFF);
2121 i >>= 8;
2122 }
2123
2124 return AH->intSize + 1;
2125}
2126
2127int
2129{
2130 int res = 0;
2131 int bv,
2132 b;
2133 int sign = 0; /* Default positive */
2134 int bitShift = 0;
2135
2136 if (AH->version > K_VERS_1_0)
2137 /* Read a sign byte */
2138 sign = AH->ReadBytePtr(AH);
2139
2140 for (b = 0; b < AH->intSize; b++)
2141 {
2142 bv = AH->ReadBytePtr(AH) & 0xFF;
2143 if (bv != 0)
2144 res = res + (bv << bitShift);
2145 bitShift += 8;
2146 }
2147
2148 if (sign)
2149 res = -res;
2150
2151 return res;
2152}
2153
2154size_t
2155WriteStr(ArchiveHandle *AH, const char *c)
2156{
2157 size_t res;
2158
2159 if (c)
2160 {
2161 int len = strlen(c);
2162
2163 res = WriteInt(AH, len);
2164 AH->WriteBufPtr(AH, c, len);
2165 res += len;
2166 }
2167 else
2168 res = WriteInt(AH, -1);
2169
2170 return res;
2171}
2172
2173char *
2175{
2176 char *buf;
2177 int l;
2178
2179 l = ReadInt(AH);
2180 if (l < 0)
2181 buf = NULL;
2182 else
2183 {
2184 buf = (char *) pg_malloc(l + 1);
2185 AH->ReadBufPtr(AH, buf, l);
2186
2187 buf[l] = '\0';
2188 }
2189
2190 return buf;
2191}
2192
2193static bool
2194_fileExistsInDirectory(const char *dir, const char *filename)
2195{
2196 struct stat st;
2197 char buf[MAXPGPATH];
2198
2199 if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2200 pg_fatal("directory name too long: \"%s\"", dir);
2201
2202 return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2203}
2204
2205static int
2207{
2208 FILE *fh;
2209 char sig[6]; /* More than enough */
2210 size_t cnt;
2211 int wantClose = 0;
2212
2213 pg_log_debug("attempting to ascertain archive format");
2214
2215 free(AH->lookahead);
2216
2217 AH->readHeader = 0;
2218 AH->lookaheadSize = 512;
2219 AH->lookahead = pg_malloc0(512);
2220 AH->lookaheadLen = 0;
2221 AH->lookaheadPos = 0;
2222
2223 if (AH->fSpec)
2224 {
2225 struct stat st;
2226
2227 wantClose = 1;
2228
2229 /*
2230 * Check if the specified archive is a directory. If so, check if
2231 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2232 */
2233 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2234 {
2235 AH->format = archDirectory;
2236 if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2237 return AH->format;
2238#ifdef HAVE_LIBZ
2239 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2240 return AH->format;
2241#endif
2242#ifdef USE_LZ4
2243 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2244 return AH->format;
2245#endif
2246#ifdef USE_ZSTD
2247 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2248 return AH->format;
2249#endif
2250 pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2251 AH->fSpec);
2252 fh = NULL; /* keep compiler quiet */
2253 }
2254 else
2255 {
2256 fh = fopen(AH->fSpec, PG_BINARY_R);
2257 if (!fh)
2258 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2259 }
2260 }
2261 else
2262 {
2263 fh = stdin;
2264 if (!fh)
2265 pg_fatal("could not open input file: %m");
2266 }
2267
2268 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2269 {
2270 if (ferror(fh))
2271 pg_fatal("could not read input file: %m");
2272 else
2273 pg_fatal("input file is too short (read %lu, expected 5)",
2274 (unsigned long) cnt);
2275 }
2276
2277 /* Save it, just in case we need it later */
2278 memcpy(&AH->lookahead[0], sig, 5);
2279 AH->lookaheadLen = 5;
2280
2281 if (strncmp(sig, "PGDMP", 5) == 0)
2282 {
2283 /* It's custom format, stop here */
2284 AH->format = archCustom;
2285 AH->readHeader = 1;
2286 }
2287 else
2288 {
2289 /*
2290 * *Maybe* we have a tar archive format file or a text dump ... So,
2291 * read first 512 byte header...
2292 */
2293 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2294 /* read failure is checked below */
2295 AH->lookaheadLen += cnt;
2296
2297 if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2298 (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2299 strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2300 {
2301 /*
2302 * looks like it's probably a text format dump. so suggest they
2303 * try psql
2304 */
2305 pg_fatal("input file appears to be a text format dump. Please use psql.");
2306 }
2307
2308 if (AH->lookaheadLen != 512)
2309 {
2310 if (feof(fh))
2311 pg_fatal("input file does not appear to be a valid archive (too short?)");
2312 else
2313 READ_ERROR_EXIT(fh);
2314 }
2315
2316 if (!isValidTarHeader(AH->lookahead))
2317 pg_fatal("input file does not appear to be a valid archive");
2318
2319 AH->format = archTar;
2320 }
2321
2322 /* Close the file if we opened it */
2323 if (wantClose)
2324 {
2325 if (fclose(fh) != 0)
2326 pg_fatal("could not close input file: %m");
2327 /* Forget lookahead, since we'll re-read header after re-opening */
2328 AH->readHeader = 0;
2329 AH->lookaheadLen = 0;
2330 }
2331
2332 return AH->format;
2333}
2334
2335
2336/*
2337 * Allocate an archive handle
2338 */
2339static ArchiveHandle *
2340_allocAH(const char *FileSpec, const ArchiveFormat fmt,
2341 const pg_compress_specification compression_spec,
2342 bool dosync, ArchiveMode mode,
2344{
2345 ArchiveHandle *AH;
2346 CompressFileHandle *CFH;
2347 pg_compress_specification out_compress_spec = {0};
2348
2349 pg_log_debug("allocating AH for %s, format %d",
2350 FileSpec ? FileSpec : "(stdio)", fmt);
2351
2352 AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2353
2354 AH->version = K_VERS_SELF;
2355
2356 /* initialize for backwards compatible string processing */
2357 AH->public.encoding = 0; /* PG_SQL_ASCII */
2358 AH->public.std_strings = false;
2359
2360 /* sql error handling */
2361 AH->public.exit_on_error = true;
2362 AH->public.n_errors = 0;
2363
2364 AH->archiveDumpVersion = PG_VERSION;
2365
2366 AH->createDate = time(NULL);
2367
2368 AH->intSize = sizeof(int);
2369 AH->offSize = sizeof(pgoff_t);
2370 if (FileSpec)
2371 {
2372 AH->fSpec = pg_strdup(FileSpec);
2373
2374 /*
2375 * Not used; maybe later....
2376 *
2377 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2378 * i--) if (AH->workDir[i-1] == '/')
2379 */
2380 }
2381 else
2382 AH->fSpec = NULL;
2383
2384 AH->currUser = NULL; /* unknown */
2385 AH->currSchema = NULL; /* ditto */
2386 AH->currTablespace = NULL; /* ditto */
2387 AH->currTableAm = NULL; /* ditto */
2388
2389 AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2390
2391 AH->toc->next = AH->toc;
2392 AH->toc->prev = AH->toc;
2393
2394 AH->mode = mode;
2395 AH->compression_spec = compression_spec;
2396 AH->dosync = dosync;
2398
2399 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2400
2401 /* Open stdout with no compression for AH output handle */
2402 out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2403 CFH = InitCompressFileHandle(out_compress_spec);
2404 if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2405 pg_fatal("could not open stdout for appending: %m");
2406 AH->OF = CFH;
2407
2408 /*
2409 * On Windows, we need to use binary mode to read/write non-text files,
2410 * which include all archive formats as well as compressed plain text.
2411 * Force stdin/stdout into binary mode if that is what we are using.
2412 */
2413#ifdef WIN32
2414 if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2415 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2416 {
2417 if (mode == archModeWrite)
2418 _setmode(fileno(stdout), O_BINARY);
2419 else
2420 _setmode(fileno(stdin), O_BINARY);
2421 }
2422#endif
2423
2424 AH->SetupWorkerPtr = setupWorkerPtr;
2425
2426 if (fmt == archUnknown)
2428 else
2429 AH->format = fmt;
2430
2431 switch (AH->format)
2432 {
2433 case archCustom:
2435 break;
2436
2437 case archNull:
2439 break;
2440
2441 case archDirectory:
2443 break;
2444
2445 case archTar:
2447 break;
2448
2449 default:
2450 pg_fatal("unrecognized file format \"%d\"", fmt);
2451 }
2452
2453 return AH;
2454}
2455
2456/*
2457 * Write out all data (tables & LOs)
2458 */
2459void
2461{
2462 TocEntry *te;
2463
2464 if (pstate && pstate->numWorkers > 1)
2465 {
2466 /*
2467 * In parallel mode, this code runs in the leader process. We
2468 * construct an array of candidate TEs, then sort it into decreasing
2469 * size order, then dispatch each TE to a data-transfer worker. By
2470 * dumping larger tables first, we avoid getting into a situation
2471 * where we're down to one job and it's big, losing parallelism.
2472 */
2473 TocEntry **tes;
2474 int ntes;
2475
2476 tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2477 ntes = 0;
2478 for (te = AH->toc->next; te != AH->toc; te = te->next)
2479 {
2480 /* Consider only TEs with dataDumper functions ... */
2481 if (!te->dataDumper)
2482 continue;
2483 /* ... and ignore ones not enabled for dump */
2484 if ((te->reqs & REQ_DATA) == 0)
2485 continue;
2486
2487 tes[ntes++] = te;
2488 }
2489
2490 if (ntes > 1)
2491 qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2492
2493 for (int i = 0; i < ntes; i++)
2494 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2495 mark_dump_job_done, NULL);
2496
2497 pg_free(tes);
2498
2499 /* Now wait for workers to finish. */
2500 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2501 }
2502 else
2503 {
2504 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2505 for (te = AH->toc->next; te != AH->toc; te = te->next)
2506 {
2507 /* Must have same filter conditions as above */
2508 if (!te->dataDumper)
2509 continue;
2510 if ((te->reqs & REQ_DATA) == 0)
2511 continue;
2512
2514 }
2515 }
2516}
2517
2518
2519/*
2520 * Callback function that's invoked in the leader process after a step has
2521 * been parallel dumped.
2522 *
2523 * We don't need to do anything except check for worker failure.
2524 */
2525static void
2527 TocEntry *te,
2528 int status,
2529 void *callback_data)
2530{
2531 pg_log_info("finished item %d %s %s",
2532 te->dumpId, te->desc, te->tag);
2533
2534 if (status != 0)
2535 pg_fatal("worker process failed: exit code %d",
2536 status);
2537}
2538
2539
2540void
2542{
2543 StartDataPtrType startPtr;
2544 EndDataPtrType endPtr;
2545
2546 AH->currToc = te;
2547
2548 if (strcmp(te->desc, "BLOBS") == 0)
2549 {
2550 startPtr = AH->StartLOsPtr;
2551 endPtr = AH->EndLOsPtr;
2552 }
2553 else
2554 {
2555 startPtr = AH->StartDataPtr;
2556 endPtr = AH->EndDataPtr;
2557 }
2558
2559 if (startPtr != NULL)
2560 (*startPtr) (AH, te);
2561
2562 /*
2563 * The user-provided DataDumper routine needs to call AH->WriteData
2564 */
2565 te->dataDumper((Archive *) AH, te->dataDumperArg);
2566
2567 if (endPtr != NULL)
2568 (*endPtr) (AH, te);
2569
2570 AH->currToc = NULL;
2571}
2572
2573void
2575{
2576 TocEntry *te;
2577 char workbuf[32];
2578 int tocCount;
2579 int i;
2580
2581 /* count entries that will actually be dumped */
2582 tocCount = 0;
2583 for (te = AH->toc->next; te != AH->toc; te = te->next)
2584 {
2585 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2586 tocCount++;
2587 }
2588
2589 /* printf("%d TOC Entries to save\n", tocCount); */
2590
2591 WriteInt(AH, tocCount);
2592
2593 for (te = AH->toc->next; te != AH->toc; te = te->next)
2594 {
2595 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2596 continue;
2597
2598 WriteInt(AH, te->dumpId);
2599 WriteInt(AH, te->dataDumper ? 1 : 0);
2600
2601 /* OID is recorded as a string for historical reasons */
2602 sprintf(workbuf, "%u", te->catalogId.tableoid);
2603 WriteStr(AH, workbuf);
2604 sprintf(workbuf, "%u", te->catalogId.oid);
2605 WriteStr(AH, workbuf);
2606
2607 WriteStr(AH, te->tag);
2608 WriteStr(AH, te->desc);
2609 WriteInt(AH, te->section);
2610 WriteStr(AH, te->defn);
2611 WriteStr(AH, te->dropStmt);
2612 WriteStr(AH, te->copyStmt);
2613 WriteStr(AH, te->namespace);
2614 WriteStr(AH, te->tablespace);
2615 WriteStr(AH, te->tableam);
2616 WriteInt(AH, te->relkind);
2617 WriteStr(AH, te->owner);
2618 WriteStr(AH, "false");
2619
2620 /* Dump list of dependencies */
2621 for (i = 0; i < te->nDeps; i++)
2622 {
2623 sprintf(workbuf, "%d", te->dependencies[i]);
2624 WriteStr(AH, workbuf);
2625 }
2626 WriteStr(AH, NULL); /* Terminate List */
2627
2628 if (AH->WriteExtraTocPtr)
2629 AH->WriteExtraTocPtr(AH, te);
2630 }
2631}
2632
2633void
2635{
2636 int i;
2637 char *tmp;
2638 DumpId *deps;
2639 int depIdx;
2640 int depSize;
2641 TocEntry *te;
2642 bool is_supported;
2643
2644 AH->tocCount = ReadInt(AH);
2645 AH->maxDumpId = 0;
2646
2647 for (i = 0; i < AH->tocCount; i++)
2648 {
2649 te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2650 te->dumpId = ReadInt(AH);
2651
2652 if (te->dumpId > AH->maxDumpId)
2653 AH->maxDumpId = te->dumpId;
2654
2655 /* Sanity check */
2656 if (te->dumpId <= 0)
2657 pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2658 te->dumpId);
2659
2660 te->hadDumper = ReadInt(AH);
2661
2662 if (AH->version >= K_VERS_1_8)
2663 {
2664 tmp = ReadStr(AH);
2665 sscanf(tmp, "%u", &te->catalogId.tableoid);
2666 free(tmp);
2667 }
2668 else
2670 tmp = ReadStr(AH);
2671 sscanf(tmp, "%u", &te->catalogId.oid);
2672 free(tmp);
2673
2674 te->tag = ReadStr(AH);
2675 te->desc = ReadStr(AH);
2676
2677 if (AH->version >= K_VERS_1_11)
2678 {
2679 te->section = ReadInt(AH);
2680 }
2681 else
2682 {
2683 /*
2684 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2685 * the entries into sections. This list need not cover entry
2686 * types added later than 8.4.
2687 */
2688 if (strcmp(te->desc, "COMMENT") == 0 ||
2689 strcmp(te->desc, "ACL") == 0 ||
2690 strcmp(te->desc, "ACL LANGUAGE") == 0)
2691 te->section = SECTION_NONE;
2692 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2693 strcmp(te->desc, "BLOBS") == 0 ||
2694 strcmp(te->desc, "BLOB COMMENTS") == 0)
2695 te->section = SECTION_DATA;
2696 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2697 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2698 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2699 strcmp(te->desc, "INDEX") == 0 ||
2700 strcmp(te->desc, "RULE") == 0 ||
2701 strcmp(te->desc, "TRIGGER") == 0)
2703 else
2705 }
2706
2707 te->defn = ReadStr(AH);
2708 te->dropStmt = ReadStr(AH);
2709
2710 if (AH->version >= K_VERS_1_3)
2711 te->copyStmt = ReadStr(AH);
2712
2713 if (AH->version >= K_VERS_1_6)
2714 te->namespace = ReadStr(AH);
2715
2716 if (AH->version >= K_VERS_1_10)
2717 te->tablespace = ReadStr(AH);
2718
2719 if (AH->version >= K_VERS_1_14)
2720 te->tableam = ReadStr(AH);
2721
2722 if (AH->version >= K_VERS_1_16)
2723 te->relkind = ReadInt(AH);
2724
2725 te->owner = ReadStr(AH);
2726 is_supported = true;
2727 if (AH->version < K_VERS_1_9)
2728 is_supported = false;
2729 else
2730 {
2731 tmp = ReadStr(AH);
2732
2733 if (strcmp(tmp, "true") == 0)
2734 is_supported = false;
2735
2736 free(tmp);
2737 }
2738
2739 if (!is_supported)
2740 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2741
2742 /* Read TOC entry dependencies */
2743 if (AH->version >= K_VERS_1_5)
2744 {
2745 depSize = 100;
2746 deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2747 depIdx = 0;
2748 for (;;)
2749 {
2750 tmp = ReadStr(AH);
2751 if (!tmp)
2752 break; /* end of list */
2753 if (depIdx >= depSize)
2754 {
2755 depSize *= 2;
2756 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2757 }
2758 sscanf(tmp, "%d", &deps[depIdx]);
2759 free(tmp);
2760 depIdx++;
2761 }
2762
2763 if (depIdx > 0) /* We have a non-null entry */
2764 {
2765 deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2766 te->dependencies = deps;
2767 te->nDeps = depIdx;
2768 }
2769 else
2770 {
2771 free(deps);
2772 te->dependencies = NULL;
2773 te->nDeps = 0;
2774 }
2775 }
2776 else
2777 {
2778 te->dependencies = NULL;
2779 te->nDeps = 0;
2780 }
2781 te->dataLength = 0;
2782
2783 if (AH->ReadExtraTocPtr)
2784 AH->ReadExtraTocPtr(AH, te);
2785
2786 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2787 i, te->dumpId, te->desc, te->tag);
2788
2789 /* link completed entry into TOC circular list */
2790 te->prev = AH->toc->prev;
2791 AH->toc->prev->next = te;
2792 AH->toc->prev = te;
2793 te->next = AH->toc;
2794
2795 /* special processing immediately upon read for some items */
2796 if (strcmp(te->desc, "ENCODING") == 0)
2797 processEncodingEntry(AH, te);
2798 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2799 processStdStringsEntry(AH, te);
2800 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2801 processSearchPathEntry(AH, te);
2802 }
2803}
2804
2805static void
2807{
2808 /* te->defn should have the form SET client_encoding = 'foo'; */
2809 char *defn = pg_strdup(te->defn);
2810 char *ptr1;
2811 char *ptr2 = NULL;
2812 int encoding;
2813
2814 ptr1 = strchr(defn, '\'');
2815 if (ptr1)
2816 ptr2 = strchr(++ptr1, '\'');
2817 if (ptr2)
2818 {
2819 *ptr2 = '\0';
2821 if (encoding < 0)
2822 pg_fatal("unrecognized encoding \"%s\"",
2823 ptr1);
2824 AH->public.encoding = encoding;
2825 }
2826 else
2827 pg_fatal("invalid ENCODING item: %s",
2828 te->defn);
2829
2830 free(defn);
2831}
2832
2833static void
2835{
2836 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2837 char *ptr1;
2838
2839 ptr1 = strchr(te->defn, '\'');
2840 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2841 AH->public.std_strings = true;
2842 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2843 AH->public.std_strings = false;
2844 else
2845 pg_fatal("invalid STDSTRINGS item: %s",
2846 te->defn);
2847}
2848
2849static void
2851{
2852 /*
2853 * te->defn should contain a command to set search_path. We just copy it
2854 * verbatim for use later.
2855 */
2856 AH->public.searchpath = pg_strdup(te->defn);
2857}
2858
2859static void
2861{
2862 const char *missing_name;
2863
2864 Assert(ropt->strict_names);
2865
2866 if (ropt->schemaNames.head != NULL)
2867 {
2868 missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2869 if (missing_name != NULL)
2870 pg_fatal("schema \"%s\" not found", missing_name);
2871 }
2872
2873 if (ropt->tableNames.head != NULL)
2874 {
2875 missing_name = simple_string_list_not_touched(&ropt->tableNames);
2876 if (missing_name != NULL)
2877 pg_fatal("table \"%s\" not found", missing_name);
2878 }
2879
2880 if (ropt->indexNames.head != NULL)
2881 {
2882 missing_name = simple_string_list_not_touched(&ropt->indexNames);
2883 if (missing_name != NULL)
2884 pg_fatal("index \"%s\" not found", missing_name);
2885 }
2886
2887 if (ropt->functionNames.head != NULL)
2888 {
2889 missing_name = simple_string_list_not_touched(&ropt->functionNames);
2890 if (missing_name != NULL)
2891 pg_fatal("function \"%s\" not found", missing_name);
2892 }
2893
2894 if (ropt->triggerNames.head != NULL)
2895 {
2896 missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2897 if (missing_name != NULL)
2898 pg_fatal("trigger \"%s\" not found", missing_name);
2899 }
2900}
2901
2902/*
2903 * Determine whether we want to restore this TOC entry.
2904 *
2905 * Returns 0 if entry should be skipped, or some combination of the
2906 * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2907 * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2908 */
2909static int
2911{
2912 int res = REQ_SCHEMA | REQ_DATA;
2913 RestoreOptions *ropt = AH->public.ropt;
2914
2915 /* These items are treated specially */
2916 if (strcmp(te->desc, "ENCODING") == 0 ||
2917 strcmp(te->desc, "STDSTRINGS") == 0 ||
2918 strcmp(te->desc, "SEARCHPATH") == 0)
2919 return REQ_SPECIAL;
2920
2921 /*
2922 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2923 * restored in createDB mode, and not restored otherwise, independently of
2924 * all else.
2925 */
2926 if (strcmp(te->desc, "DATABASE") == 0 ||
2927 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2928 {
2929 if (ropt->createDB)
2930 return REQ_SCHEMA;
2931 else
2932 return 0;
2933 }
2934
2935 /*
2936 * Process exclusions that affect certain classes of TOC entries.
2937 */
2938
2939 /* If it's an ACL, maybe ignore it */
2940 if (ropt->aclsSkip && _tocEntryIsACL(te))
2941 return 0;
2942
2943 /* If it's a comment, maybe ignore it */
2944 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2945 return 0;
2946
2947 /*
2948 * If it's a publication or a table part of a publication, maybe ignore
2949 * it.
2950 */
2951 if (ropt->no_publications &&
2952 (strcmp(te->desc, "PUBLICATION") == 0 ||
2953 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2954 strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2955 return 0;
2956
2957 /* If it's a security label, maybe ignore it */
2958 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2959 return 0;
2960
2961 /* If it's a subscription, maybe ignore it */
2962 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2963 return 0;
2964
2965 /* Ignore it if section is not to be dumped/restored */
2966 switch (curSection)
2967 {
2968 case SECTION_PRE_DATA:
2969 if (!(ropt->dumpSections & DUMP_PRE_DATA))
2970 return 0;
2971 break;
2972 case SECTION_DATA:
2973 if (!(ropt->dumpSections & DUMP_DATA))
2974 return 0;
2975 break;
2976 case SECTION_POST_DATA:
2977 if (!(ropt->dumpSections & DUMP_POST_DATA))
2978 return 0;
2979 break;
2980 default:
2981 /* shouldn't get here, really, but ignore it */
2982 return 0;
2983 }
2984
2985 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2986 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2987 return 0;
2988
2989 /*
2990 * Check options for selective dump/restore.
2991 */
2992 if (strcmp(te->desc, "ACL") == 0 ||
2993 strcmp(te->desc, "COMMENT") == 0 ||
2994 strcmp(te->desc, "SECURITY LABEL") == 0)
2995 {
2996 /* Database properties react to createDB, not selectivity options. */
2997 if (strncmp(te->tag, "DATABASE ", 9) == 0)
2998 {
2999 if (!ropt->createDB)
3000 return 0;
3001 }
3002 else if (ropt->schemaNames.head != NULL ||
3003 ropt->schemaExcludeNames.head != NULL ||
3004 ropt->selTypes)
3005 {
3006 /*
3007 * In a selective dump/restore, we want to restore these dependent
3008 * TOC entry types only if their parent object is being restored.
3009 * Without selectivity options, we let through everything in the
3010 * archive. Note there may be such entries with no parent, eg
3011 * non-default ACLs for built-in objects. Also, we make
3012 * per-column ACLs additionally depend on the table's ACL if any
3013 * to ensure correct restore order, so those dependencies should
3014 * be ignored in this check.
3015 *
3016 * This code depends on the parent having been marked already,
3017 * which should be the case; if it isn't, perhaps due to
3018 * SortTocFromFile rearrangement, skipping the dependent entry
3019 * seems prudent anyway.
3020 *
3021 * Ideally we'd handle, eg, table CHECK constraints this way too.
3022 * But it's hard to tell which of their dependencies is the one to
3023 * consult.
3024 */
3025 bool dumpthis = false;
3026
3027 for (int i = 0; i < te->nDeps; i++)
3028 {
3029 TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3030
3031 if (!pte)
3032 continue; /* probably shouldn't happen */
3033 if (strcmp(pte->desc, "ACL") == 0)
3034 continue; /* ignore dependency on another ACL */
3035 if (pte->reqs == 0)
3036 continue; /* this object isn't marked, so ignore it */
3037 /* Found a parent to be dumped, so we want to dump this too */
3038 dumpthis = true;
3039 break;
3040 }
3041 if (!dumpthis)
3042 return 0;
3043 }
3044 }
3045 else
3046 {
3047 /* Apply selective-restore rules for standalone TOC entries. */
3048 if (ropt->schemaNames.head != NULL)
3049 {
3050 /* If no namespace is specified, it means all. */
3051 if (!te->namespace)
3052 return 0;
3053 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3054 return 0;
3055 }
3056
3057 if (ropt->schemaExcludeNames.head != NULL &&
3058 te->namespace &&
3059 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3060 return 0;
3061
3062 if (ropt->selTypes)
3063 {
3064 if (strcmp(te->desc, "TABLE") == 0 ||
3065 strcmp(te->desc, "TABLE DATA") == 0 ||
3066 strcmp(te->desc, "VIEW") == 0 ||
3067 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3068 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3069 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3070 strcmp(te->desc, "SEQUENCE") == 0 ||
3071 strcmp(te->desc, "SEQUENCE SET") == 0)
3072 {
3073 if (!ropt->selTable)
3074 return 0;
3075 if (ropt->tableNames.head != NULL &&
3077 return 0;
3078 }
3079 else if (strcmp(te->desc, "INDEX") == 0)
3080 {
3081 if (!ropt->selIndex)
3082 return 0;
3083 if (ropt->indexNames.head != NULL &&
3085 return 0;
3086 }
3087 else if (strcmp(te->desc, "FUNCTION") == 0 ||
3088 strcmp(te->desc, "AGGREGATE") == 0 ||
3089 strcmp(te->desc, "PROCEDURE") == 0)
3090 {
3091 if (!ropt->selFunction)
3092 return 0;
3093 if (ropt->functionNames.head != NULL &&
3095 return 0;
3096 }
3097 else if (strcmp(te->desc, "TRIGGER") == 0)
3098 {
3099 if (!ropt->selTrigger)
3100 return 0;
3101 if (ropt->triggerNames.head != NULL &&
3103 return 0;
3104 }
3105 else
3106 return 0;
3107 }
3108 }
3109
3110 /*
3111 * Determine whether the TOC entry contains schema and/or data components,
3112 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3113 * it's both schema and data. Otherwise it's probably schema-only, but
3114 * there are exceptions.
3115 */
3116 if (!te->hadDumper)
3117 {
3118 /*
3119 * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3120 * is considered a data entry. We don't need to check for BLOBS or
3121 * old-style BLOB COMMENTS entries, because they will have hadDumper =
3122 * true ... but we do need to check new-style BLOB ACLs, comments,
3123 * etc.
3124 */
3125 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3126 strcmp(te->desc, "BLOB") == 0 ||
3127 strcmp(te->desc, "BLOB METADATA") == 0 ||
3128 (strcmp(te->desc, "ACL") == 0 &&
3129 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3130 (strcmp(te->desc, "COMMENT") == 0 &&
3131 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3132 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3133 strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3134 res = res & REQ_DATA;
3135 else
3136 res = res & ~REQ_DATA;
3137 }
3138
3139 /*
3140 * If there's no definition command, there's no schema component. Treat
3141 * "load via partition root" comments as not schema.
3142 */
3143 if (!te->defn || !te->defn[0] ||
3144 strncmp(te->defn, "-- load via partition root ", 27) == 0)
3145 res = res & ~REQ_SCHEMA;
3146
3147 /*
3148 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3149 * always ignore it.
3150 */
3151 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3152 return 0;
3153
3154 /* Mask it if we don't want data */
3155 if (!ropt->dumpData)
3156 {
3157 /*
3158 * The sequence_data option overrides dumpData for SEQUENCE SET.
3159 *
3160 * In binary-upgrade mode, even with dumpData unset, we do not mask
3161 * out large objects. (Only large object definitions, comments and
3162 * other metadata should be generated in binary-upgrade mode, not the
3163 * actual data, but that need not concern us here.)
3164 */
3165 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3166 !(ropt->binary_upgrade &&
3167 (strcmp(te->desc, "BLOB") == 0 ||
3168 strcmp(te->desc, "BLOB METADATA") == 0 ||
3169 (strcmp(te->desc, "ACL") == 0 &&
3170 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3171 (strcmp(te->desc, "COMMENT") == 0 &&
3172 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3173 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3174 strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3175 res = res & REQ_SCHEMA;
3176 }
3177
3178 /* Mask it if we don't want schema */
3179 if (!ropt->dumpSchema)
3180 res = res & REQ_DATA;
3181
3182 return res;
3183}
3184
3185/*
3186 * Identify which pass we should restore this TOC entry in.
3187 *
3188 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3189 */
3190static RestorePass
3192{
3193 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3194 if (strcmp(te->desc, "ACL") == 0 ||
3195 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3196 strcmp(te->desc, "DEFAULT ACL") == 0)
3197 return RESTORE_PASS_ACL;
3198 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3199 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3200 return RESTORE_PASS_POST_ACL;
3201
3202 /*
3203 * Comments need to be emitted in the same pass as their parent objects.
3204 * ACLs haven't got comments, and neither do matview data objects, but
3205 * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3206 * we'd need yet another weird special case.)
3207 */
3208 if (strcmp(te->desc, "COMMENT") == 0 &&
3209 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3210 return RESTORE_PASS_POST_ACL;
3211
3212 /* All else can be handled in the main pass. */
3213 return RESTORE_PASS_MAIN;
3214}
3215
3216/*
3217 * Identify TOC entries that are ACLs.
3218 *
3219 * Note: it seems worth duplicating some code here to avoid a hard-wired
3220 * assumption that these are exactly the same entries that we restore during
3221 * the RESTORE_PASS_ACL phase.
3222 */
3223static bool
3225{
3226 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3227 if (strcmp(te->desc, "ACL") == 0 ||
3228 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3229 strcmp(te->desc, "DEFAULT ACL") == 0)
3230 return true;
3231 return false;
3232}
3233
3234/*
3235 * Issue SET commands for parameters that we want to have set the same way
3236 * at all times during execution of a restore script.
3237 */
3238static void
3240{
3241 RestoreOptions *ropt = AH->public.ropt;
3242
3243 /*
3244 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3245 */
3246 ahprintf(AH, "SET statement_timeout = 0;\n");
3247 ahprintf(AH, "SET lock_timeout = 0;\n");
3248 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3249 ahprintf(AH, "SET transaction_timeout = 0;\n");
3250
3251 /* Select the correct character set encoding */
3252 ahprintf(AH, "SET client_encoding = '%s';\n",
3254
3255 /* Select the correct string literal syntax */
3256 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3257 AH->public.std_strings ? "on" : "off");
3258
3259 /* Select the role to be used during restore */
3260 if (ropt && ropt->use_role)
3261 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3262
3263 /* Select the dump-time search_path */
3264 if (AH->public.searchpath)
3265 ahprintf(AH, "%s", AH->public.searchpath);
3266
3267 /* Make sure function checking is disabled */
3268 ahprintf(AH, "SET check_function_bodies = false;\n");
3269
3270 /* Ensure that all valid XML data will be accepted */
3271 ahprintf(AH, "SET xmloption = content;\n");
3272
3273 /* Avoid annoying notices etc */
3274 ahprintf(AH, "SET client_min_messages = warning;\n");
3275 if (!AH->public.std_strings)
3276 ahprintf(AH, "SET escape_string_warning = off;\n");
3277
3278 /* Adjust row-security state */
3279 if (ropt && ropt->enable_row_security)
3280 ahprintf(AH, "SET row_security = on;\n");
3281 else
3282 ahprintf(AH, "SET row_security = off;\n");
3283
3284 /*
3285 * In --transaction-size mode, we should always be in a transaction when
3286 * we begin to restore objects.
3287 */
3288 if (ropt && ropt->txn_size > 0)
3289 {
3290 if (AH->connection)
3292 else
3293 ahprintf(AH, "\nBEGIN;\n");
3294 AH->txnCount = 0;
3295 }
3296
3297 ahprintf(AH, "\n");
3298}
3299
3300/*
3301 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3302 * for updating state if appropriate. If user is NULL or an empty string,
3303 * the specification DEFAULT will be used.
3304 */
3305static void
3307{
3309
3310 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3311
3312 /*
3313 * SQL requires a string literal here. Might as well be correct.
3314 */
3315 if (user && *user)
3316 appendStringLiteralAHX(cmd, user, AH);
3317 else
3318 appendPQExpBufferStr(cmd, "DEFAULT");
3319 appendPQExpBufferChar(cmd, ';');
3320
3321 if (RestoringToDB(AH))
3322 {
3323 PGresult *res;
3324
3325 res = PQexec(AH->connection, cmd->data);
3326
3328 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3329 pg_fatal("could not set session user to \"%s\": %s",
3331
3332 PQclear(res);
3333 }
3334 else
3335 ahprintf(AH, "%s\n\n", cmd->data);
3336
3337 destroyPQExpBuffer(cmd);
3338}
3339
3340
3341/*
3342 * Issue the commands to connect to the specified database.
3343 *
3344 * If we're currently restoring right into a database, this will
3345 * actually establish a connection. Otherwise it puts a \connect into
3346 * the script output.
3347 */
3348static void
3350{
3351 if (RestoringToDB(AH))
3353 else
3354 {
3355 PQExpBufferData connectbuf;
3356
3357 initPQExpBuffer(&connectbuf);
3358 appendPsqlMetaConnect(&connectbuf, dbname);
3359 ahprintf(AH, "%s\n", connectbuf.data);
3360 termPQExpBuffer(&connectbuf);
3361 }
3362
3363 /*
3364 * NOTE: currUser keeps track of what the imaginary session user in our
3365 * script is. It's now effectively reset to the original userID.
3366 */
3367 free(AH->currUser);
3368 AH->currUser = NULL;
3369
3370 /* don't assume we still know the output schema, tablespace, etc either */
3371 free(AH->currSchema);
3372 AH->currSchema = NULL;
3373
3374 free(AH->currTableAm);
3375 AH->currTableAm = NULL;
3376
3377 free(AH->currTablespace);
3378 AH->currTablespace = NULL;
3379
3380 /* re-establish fixed state */
3382}
3383
3384/*
3385 * Become the specified user, and update state to avoid redundant commands
3386 *
3387 * NULL or empty argument is taken to mean restoring the session default
3388 */
3389static void
3391{
3392 if (!user)
3393 user = ""; /* avoid null pointers */
3394
3395 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3396 return; /* no need to do anything */
3397
3399
3400 /*
3401 * NOTE: currUser keeps track of what the imaginary session user in our
3402 * script is
3403 */
3404 free(AH->currUser);
3405 AH->currUser = pg_strdup(user);
3406}
3407
3408/*
3409 * Become the owner of the given TOC entry object. If
3410 * changes in ownership are not allowed, this doesn't do anything.
3411 */
3412static void
3414{
3415 RestoreOptions *ropt = AH->public.ropt;
3416
3417 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3418 return;
3419
3420 _becomeUser(AH, te->owner);
3421}
3422
3423
3424/*
3425 * Issue the commands to select the specified schema as the current schema
3426 * in the target database.
3427 */
3428static void
3429_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3430{
3431 PQExpBuffer qry;
3432
3433 /*
3434 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3435 * that search_path rather than switching to entry-specific paths.
3436 * Otherwise, it's an old archive that will not restore correctly unless
3437 * we set the search_path as it's expecting.
3438 */
3439 if (AH->public.searchpath)
3440 return;
3441
3442 if (!schemaName || *schemaName == '\0' ||
3443 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3444 return; /* no need to do anything */
3445
3446 qry = createPQExpBuffer();
3447
3448 appendPQExpBuffer(qry, "SET search_path = %s",
3449 fmtId(schemaName));
3450 if (strcmp(schemaName, "pg_catalog") != 0)
3451 appendPQExpBufferStr(qry, ", pg_catalog");
3452
3453 if (RestoringToDB(AH))
3454 {
3455 PGresult *res;
3456
3457 res = PQexec(AH->connection, qry->data);
3458
3461 "could not set \"search_path\" to \"%s\": %s",
3462 schemaName, PQerrorMessage(AH->connection));
3463
3464 PQclear(res);
3465 }
3466 else
3467 ahprintf(AH, "%s;\n\n", qry->data);
3468
3469 free(AH->currSchema);
3470 AH->currSchema = pg_strdup(schemaName);
3471
3472 destroyPQExpBuffer(qry);
3473}
3474
3475/*
3476 * Issue the commands to select the specified tablespace as the current one
3477 * in the target database.
3478 */
3479static void
3481{
3482 RestoreOptions *ropt = AH->public.ropt;
3483 PQExpBuffer qry;
3484 const char *want,
3485 *have;
3486
3487 /* do nothing in --no-tablespaces mode */
3488 if (ropt->noTablespace)
3489 return;
3490
3491 have = AH->currTablespace;
3492 want = tablespace;
3493
3494 /* no need to do anything for non-tablespace object */
3495 if (!want)
3496 return;
3497
3498 if (have && strcmp(want, have) == 0)
3499 return; /* no need to do anything */
3500
3501 qry = createPQExpBuffer();
3502
3503 if (strcmp(want, "") == 0)
3504 {
3505 /* We want the tablespace to be the database's default */
3506 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3507 }
3508 else
3509 {
3510 /* We want an explicit tablespace */
3511 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3512 }
3513
3514 if (RestoringToDB(AH))
3515 {
3516 PGresult *res;
3517
3518 res = PQexec(AH->connection, qry->data);
3519
3522 "could not set \"default_tablespace\" to %s: %s",
3523 fmtId(want), PQerrorMessage(AH->connection));
3524
3525 PQclear(res);
3526 }
3527 else
3528 ahprintf(AH, "%s;\n\n", qry->data);
3529
3530 free(AH->currTablespace);
3531 AH->currTablespace = pg_strdup(want);
3532
3533 destroyPQExpBuffer(qry);
3534}
3535
3536/*
3537 * Set the proper default_table_access_method value for the table.
3538 */
3539static void
3541{
3542 RestoreOptions *ropt = AH->public.ropt;
3543 PQExpBuffer cmd;
3544 const char *want,
3545 *have;
3546
3547 /* do nothing in --no-table-access-method mode */
3548 if (ropt->noTableAm)
3549 return;
3550
3551 have = AH->currTableAm;
3552 want = tableam;
3553
3554 if (!want)
3555 return;
3556
3557 if (have && strcmp(want, have) == 0)
3558 return;
3559
3560 cmd = createPQExpBuffer();
3561 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3562
3563 if (RestoringToDB(AH))
3564 {
3565 PGresult *res;
3566
3567 res = PQexec(AH->connection, cmd->data);
3568
3571 "could not set \"default_table_access_method\": %s",
3573
3574 PQclear(res);
3575 }
3576 else
3577 ahprintf(AH, "%s\n\n", cmd->data);
3578
3579 destroyPQExpBuffer(cmd);
3580
3581 free(AH->currTableAm);
3582 AH->currTableAm = pg_strdup(want);
3583}
3584
3585/*
3586 * Set the proper default table access method for a table without storage.
3587 * Currently, this is required only for partitioned tables with a table AM.
3588 */
3589static void
3591{
3592 RestoreOptions *ropt = AH->public.ropt;
3593 const char *tableam = te->tableam;
3594 PQExpBuffer cmd;
3595
3596 /* do nothing in --no-table-access-method mode */
3597 if (ropt->noTableAm)
3598 return;
3599
3600 if (!tableam)
3601 return;
3602
3603 Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3604
3605 cmd = createPQExpBuffer();
3606
3607 appendPQExpBufferStr(cmd, "ALTER TABLE ");
3608 appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3609 appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3610 fmtId(tableam));
3611
3612 if (RestoringToDB(AH))
3613 {
3614 PGresult *res;
3615
3616 res = PQexec(AH->connection, cmd->data);
3617
3620 "could not alter table access method: %s",
3622 PQclear(res);
3623 }
3624 else
3625 ahprintf(AH, "%s\n\n", cmd->data);
3626
3627 destroyPQExpBuffer(cmd);
3628}
3629
3630/*
3631 * Extract an object description for a TOC entry, and append it to buf.
3632 *
3633 * This is used for ALTER ... OWNER TO.
3634 *
3635 * If the object type has no owner, do nothing.
3636 */
3637static void
3639{
3640 const char *type = te->desc;
3641
3642 /* objects that don't require special decoration */
3643 if (strcmp(type, "COLLATION") == 0 ||
3644 strcmp(type, "CONVERSION") == 0 ||
3645 strcmp(type, "DOMAIN") == 0 ||
3646 strcmp(type, "FOREIGN TABLE") == 0 ||
3647 strcmp(type, "MATERIALIZED VIEW") == 0 ||
3648 strcmp(type, "SEQUENCE") == 0 ||
3649 strcmp(type, "STATISTICS") == 0 ||
3650 strcmp(type, "TABLE") == 0 ||
3651 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3652 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3653 strcmp(type, "TYPE") == 0 ||
3654 strcmp(type, "VIEW") == 0 ||
3655 /* non-schema-specified objects */
3656 strcmp(type, "DATABASE") == 0 ||
3657 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3658 strcmp(type, "SCHEMA") == 0 ||
3659 strcmp(type, "EVENT TRIGGER") == 0 ||
3660 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3661 strcmp(type, "SERVER") == 0 ||
3662 strcmp(type, "PUBLICATION") == 0 ||
3663 strcmp(type, "SUBSCRIPTION") == 0)
3664 {
3665 appendPQExpBuffer(buf, "%s ", type);
3666 if (te->namespace && *te->namespace)
3667 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3669 }
3670 /* LOs just have a name, but it's numeric so must not use fmtId */
3671 else if (strcmp(type, "BLOB") == 0)
3672 {
3673 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3674 }
3675
3676 /*
3677 * These object types require additional decoration. Fortunately, the
3678 * information needed is exactly what's in the DROP command.
3679 */
3680 else if (strcmp(type, "AGGREGATE") == 0 ||
3681 strcmp(type, "FUNCTION") == 0 ||
3682 strcmp(type, "OPERATOR") == 0 ||
3683 strcmp(type, "OPERATOR CLASS") == 0 ||
3684 strcmp(type, "OPERATOR FAMILY") == 0 ||
3685 strcmp(type, "PROCEDURE") == 0)
3686 {
3687 /* Chop "DROP " off the front and make a modifiable copy */
3688 char *first = pg_strdup(te->dropStmt + 5);
3689 char *last;
3690
3691 /* point to last character in string */
3692 last = first + strlen(first) - 1;
3693
3694 /* Strip off any ';' or '\n' at the end */
3695 while (last >= first && (*last == '\n' || *last == ';'))
3696 last--;
3697 *(last + 1) = '\0';
3698
3699 appendPQExpBufferStr(buf, first);
3700
3701 free(first);
3702 return;
3703 }
3704 /* these object types don't have separate owners */
3705 else if (strcmp(type, "CAST") == 0 ||
3706 strcmp(type, "CHECK CONSTRAINT") == 0 ||
3707 strcmp(type, "CONSTRAINT") == 0 ||
3708 strcmp(type, "DATABASE PROPERTIES") == 0 ||
3709 strcmp(type, "DEFAULT") == 0 ||
3710 strcmp(type, "FK CONSTRAINT") == 0 ||
3711 strcmp(type, "INDEX") == 0 ||
3712 strcmp(type, "RULE") == 0 ||
3713 strcmp(type, "TRIGGER") == 0 ||
3714 strcmp(type, "ROW SECURITY") == 0 ||
3715 strcmp(type, "POLICY") == 0 ||
3716 strcmp(type, "USER MAPPING") == 0)
3717 {
3718 /* do nothing */
3719 }
3720 else
3721 pg_fatal("don't know how to set owner for object type \"%s\"", type);
3722}
3723
3724/*
3725 * Emit the SQL commands to create the object represented by a TOC entry
3726 *
3727 * This now also includes issuing an ALTER OWNER command to restore the
3728 * object's ownership, if wanted. But note that the object's permissions
3729 * will remain at default, until the matching ACL TOC entry is restored.
3730 */
3731static void
3733{
3734 RestoreOptions *ropt = AH->public.ropt;
3735
3736 /*
3737 * Select owner, schema, tablespace and default AM as necessary. The
3738 * default access method for partitioned tables is handled after
3739 * generating the object definition, as it requires an ALTER command
3740 * rather than SET.
3741 */
3742 _becomeOwner(AH, te);
3743 _selectOutputSchema(AH, te->namespace);
3745 if (te->relkind != RELKIND_PARTITIONED_TABLE)
3747
3748 /* Emit header comment for item */
3749 if (!AH->noTocComments)
3750 {
3751 const char *pfx;
3752 char *sanitized_name;
3753 char *sanitized_schema;
3754 char *sanitized_owner;
3755
3756 if (isData)
3757 pfx = "Data for ";
3758 else
3759 pfx = "";
3760
3761 ahprintf(AH, "--\n");
3762 if (AH->public.verbose)
3763 {
3764 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3765 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3766 if (te->nDeps > 0)
3767 {
3768 int i;
3769
3770 ahprintf(AH, "-- Dependencies:");
3771 for (i = 0; i < te->nDeps; i++)
3772 ahprintf(AH, " %d", te->dependencies[i]);
3773 ahprintf(AH, "\n");
3774 }
3775 }
3776
3777 sanitized_name = sanitize_line(te->tag, false);
3778 sanitized_schema = sanitize_line(te->namespace, true);
3779 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3780
3781 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3782 pfx, sanitized_name, te->desc, sanitized_schema,
3783 sanitized_owner);
3784
3785 free(sanitized_name);
3786 free(sanitized_schema);
3787 free(sanitized_owner);
3788
3789 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3790 {
3791 char *sanitized_tablespace;
3792
3793 sanitized_tablespace = sanitize_line(te->tablespace, false);
3794 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3795 free(sanitized_tablespace);
3796 }
3797 ahprintf(AH, "\n");
3798
3799 if (AH->PrintExtraTocPtr != NULL)
3800 AH->PrintExtraTocPtr(AH, te);
3801 ahprintf(AH, "--\n\n");
3802 }
3803
3804 /*
3805 * Actually print the definition. Normally we can just print the defn
3806 * string if any, but we have three special cases:
3807 *
3808 * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3809 * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3810 * "public" that is a comment. We have to do this when --no-owner mode is
3811 * selected. This is ugly, but I see no other good way ...
3812 *
3813 * 2. BLOB METADATA entries need special processing since their defn
3814 * strings are just lists of OIDs, not complete SQL commands.
3815 *
3816 * 3. ACL LARGE OBJECTS entries need special processing because they
3817 * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3818 * apply to each large object listed in the associated BLOB METADATA.
3819 */
3820 if (ropt->noOwner &&
3821 strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3822 {
3823 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3824 }
3825 else if (strcmp(te->desc, "BLOB METADATA") == 0)
3826 {
3827 IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3828 }
3829 else if (strcmp(te->desc, "ACL") == 0 &&
3830 strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3831 {
3832 IssueACLPerBlob(AH, te);
3833 }
3834 else if (te->defn && strlen(te->defn) > 0)
3835 {
3836 ahprintf(AH, "%s\n\n", te->defn);
3837
3838 /*
3839 * If the defn string contains multiple SQL commands, txn_size mode
3840 * should count it as N actions not one. But rather than build a full
3841 * SQL parser, approximate this by counting semicolons. One case
3842 * where that tends to be badly fooled is function definitions, so
3843 * ignore them. (restore_toc_entry will count one action anyway.)
3844 */
3845 if (ropt->txn_size > 0 &&
3846 strcmp(te->desc, "FUNCTION") != 0 &&
3847 strcmp(te->desc, "PROCEDURE") != 0)
3848 {
3849 const char *p = te->defn;
3850 int nsemis = 0;
3851
3852 while ((p = strchr(p, ';')) != NULL)
3853 {
3854 nsemis++;
3855 p++;
3856 }
3857 if (nsemis > 1)
3858 AH->txnCount += nsemis - 1;
3859 }
3860 }
3861
3862 /*
3863 * If we aren't using SET SESSION AUTH to determine ownership, we must
3864 * instead issue an ALTER OWNER command. Schema "public" is special; when
3865 * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3866 * when using SET SESSION for all other objects. We assume that anything
3867 * without a DROP command is not a separately ownable object.
3868 */
3869 if (!ropt->noOwner &&
3870 (!ropt->use_setsessauth ||
3871 (strcmp(te->desc, "SCHEMA") == 0 &&
3872 strncmp(te->defn, "--", 2) == 0)) &&
3873 te->owner && strlen(te->owner) > 0 &&
3874 te->dropStmt && strlen(te->dropStmt) > 0)
3875 {
3876 if (strcmp(te->desc, "BLOB METADATA") == 0)
3877 {
3878 /* BLOB METADATA needs special code to handle multiple LOs */
3879 char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3880
3881 IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3882 pg_free(cmdEnd);
3883 }
3884 else
3885 {
3886 /* For all other cases, we can use _getObjectDescription */
3887 PQExpBufferData temp;
3888
3889 initPQExpBuffer(&temp);
3890 _getObjectDescription(&temp, te);
3891
3892 /*
3893 * If _getObjectDescription() didn't fill the buffer, then there
3894 * is no owner.
3895 */
3896 if (temp.data[0])
3897 ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3898 temp.data, fmtId(te->owner));
3899 termPQExpBuffer(&temp);
3900 }
3901 }
3902
3903 /*
3904 * Select a partitioned table's default AM, once the table definition has
3905 * been generated.
3906 */
3907 if (te->relkind == RELKIND_PARTITIONED_TABLE)
3909
3910 /*
3911 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3912 * commands, so we can no longer assume we know the current auth setting.
3913 */
3914 if (_tocEntryIsACL(te))
3915 {
3916 free(AH->currUser);
3917 AH->currUser = NULL;
3918 }
3919}
3920
3921/*
3922 * Sanitize a string to be included in an SQL comment or TOC listing, by
3923 * replacing any newlines with spaces. This ensures each logical output line
3924 * is in fact one physical output line, to prevent corruption of the dump
3925 * (which could, in the worst case, present an SQL injection vulnerability
3926 * if someone were to incautiously load a dump containing objects with
3927 * maliciously crafted names).
3928 *
3929 * The result is a freshly malloc'd string. If the input string is NULL,
3930 * return a malloc'ed empty string, unless want_hyphen, in which case return a
3931 * malloc'ed hyphen.
3932 *
3933 * Note that we currently don't bother to quote names, meaning that the name
3934 * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3935 * it only examines the dumpId field, but someday we might want to try harder.
3936 */
3937static char *
3938sanitize_line(const char *str, bool want_hyphen)
3939{
3940 char *result;
3941 char *s;
3942
3943 if (!str)
3944 return pg_strdup(want_hyphen ? "-" : "");
3945
3946 result = pg_strdup(str);
3947
3948 for (s = result; *s != '\0'; s++)
3949 {
3950 if (*s == '\n' || *s == '\r')
3951 *s = ' ';
3952 }
3953
3954 return result;
3955}
3956
3957/*
3958 * Write the file header for a custom-format archive
3959 */
3960void
3962{
3963 struct tm crtm;
3964
3965 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3966 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3967 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3968 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3969 AH->WriteBytePtr(AH, AH->intSize);
3970 AH->WriteBytePtr(AH, AH->offSize);
3971 AH->WriteBytePtr(AH, AH->format);
3973 crtm = *localtime(&AH->createDate);
3974 WriteInt(AH, crtm.tm_sec);
3975 WriteInt(AH, crtm.tm_min);
3976 WriteInt(AH, crtm.tm_hour);
3977 WriteInt(AH, crtm.tm_mday);
3978 WriteInt(AH, crtm.tm_mon);
3979 WriteInt(AH, crtm.tm_year);
3980 WriteInt(AH, crtm.tm_isdst);
3981 WriteStr(AH, PQdb(AH->connection));
3983 WriteStr(AH, PG_VERSION);
3984}
3985
3986void
3988{
3989 char *errmsg;
3990 char vmaj,
3991 vmin,
3992 vrev;
3993 int fmt;
3994
3995 /*
3996 * If we haven't already read the header, do so.
3997 *
3998 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3999 * way to unify the cases?
4000 */
4001 if (!AH->readHeader)
4002 {
4003 char tmpMag[7];
4004
4005 AH->ReadBufPtr(AH, tmpMag, 5);
4006
4007 if (strncmp(tmpMag, "PGDMP", 5) != 0)
4008 pg_fatal("did not find magic string in file header");
4009 }
4010
4011 vmaj = AH->ReadBytePtr(AH);
4012 vmin = AH->ReadBytePtr(AH);
4013
4014 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4015 vrev = AH->ReadBytePtr(AH);
4016 else
4017 vrev = 0;
4018
4019 AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4020
4021 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4022 pg_fatal("unsupported version (%d.%d) in file header",
4023 vmaj, vmin);
4024
4025 AH->intSize = AH->ReadBytePtr(AH);
4026 if (AH->intSize > 32)
4027 pg_fatal("sanity check on integer size (%lu) failed",
4028 (unsigned long) AH->intSize);
4029
4030 if (AH->intSize > sizeof(int))
4031 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4032
4033 if (AH->version >= K_VERS_1_7)
4034 AH->offSize = AH->ReadBytePtr(AH);
4035 else
4036 AH->offSize = AH->intSize;
4037
4038 fmt = AH->ReadBytePtr(AH);
4039
4040 if (AH->format != fmt)
4041 pg_fatal("expected format (%d) differs from format found in file (%d)",
4042 AH->format, fmt);
4043
4044 if (AH->version >= K_VERS_1_15)
4046 else if (AH->version >= K_VERS_1_2)
4047 {
4048 /* Guess the compression method based on the level */
4049 if (AH->version < K_VERS_1_4)
4050 AH->compression_spec.level = AH->ReadBytePtr(AH);
4051 else
4052 AH->compression_spec.level = ReadInt(AH);
4053
4054 if (AH->compression_spec.level != 0)
4056 }
4057 else
4059
4061 if (errmsg)
4062 {
4063 pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4064 errmsg);
4065 pg_free(errmsg);
4066 }
4067
4068 if (AH->version >= K_VERS_1_4)
4069 {
4070 struct tm crtm;
4071
4072 crtm.tm_sec = ReadInt(AH);
4073 crtm.tm_min = ReadInt(AH);
4074 crtm.tm_hour = ReadInt(AH);
4075 crtm.tm_mday = ReadInt(AH);
4076 crtm.tm_mon = ReadInt(AH);
4077 crtm.tm_year = ReadInt(AH);
4078 crtm.tm_isdst = ReadInt(AH);
4079
4080 /*
4081 * Newer versions of glibc have mktime() report failure if tm_isdst is
4082 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4083 * TZ=UTC. This is problematic when restoring an archive under a
4084 * different timezone setting. If we get a failure, try again with
4085 * tm_isdst set to -1 ("don't know").
4086 *
4087 * XXX with or without this hack, we reconstruct createDate
4088 * incorrectly when the prevailing timezone is different from
4089 * pg_dump's. Next time we bump the archive version, we should flush
4090 * this representation and store a plain seconds-since-the-Epoch
4091 * timestamp instead.
4092 */
4093 AH->createDate = mktime(&crtm);
4094 if (AH->createDate == (time_t) -1)
4095 {
4096 crtm.tm_isdst = -1;
4097 AH->createDate = mktime(&crtm);
4098 if (AH->createDate == (time_t) -1)
4099 pg_log_warning("invalid creation date in header");
4100 }
4101 }
4102
4103 if (AH->version >= K_VERS_1_4)
4104 {
4105 AH->archdbname = ReadStr(AH);
4106 }
4107
4108 if (AH->version >= K_VERS_1_10)
4109 {
4110 AH->archiveRemoteVersion = ReadStr(AH);
4111 AH->archiveDumpVersion = ReadStr(AH);
4112 }
4113}
4114
4115
4116/*
4117 * checkSeek
4118 * check to see if ftell/fseek can be performed.
4119 */
4120bool
4121checkSeek(FILE *fp)
4122{
4123 pgoff_t tpos;
4124
4125 /* Check that ftello works on this file */
4126 tpos = ftello(fp);
4127 if (tpos < 0)
4128 return false;
4129
4130 /*
4131 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4132 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4133 * successful no-op even on files that are otherwise unseekable.
4134 */
4135 if (fseeko(fp, tpos, SEEK_SET) != 0)
4136 return false;
4137
4138 return true;
4139}
4140
4141
4142/*
4143 * dumpTimestamp
4144 */
4145static void
4146dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4147{
4148 char buf[64];
4149
4150 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4151 ahprintf(AH, "-- %s %s\n\n", msg, buf);
4152}
4153
4154/*
4155 * Main engine for parallel restore.
4156 *
4157 * Parallel restore is done in three phases. In this first phase,
4158 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4159 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4160 * PRE_DATA items other than ACLs.) Entries we can't process now are
4161 * added to the pending_list for later phases to deal with.
4162 */
4163static void
4165{
4166 bool skipped_some;
4167 TocEntry *next_work_item;
4168
4169 pg_log_debug("entering restore_toc_entries_prefork");
4170
4171 /* Adjust dependency information */
4172 fix_dependencies(AH);
4173
4174 /*
4175 * Do all the early stuff in a single connection in the parent. There's no
4176 * great point in running it in parallel, in fact it will actually run
4177 * faster in a single connection because we avoid all the connection and
4178 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4179 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4180 * not risk trying to process them out-of-order.
4181 *
4182 * Stuff that we can't do immediately gets added to the pending_list.
4183 * Note: we don't yet filter out entries that aren't going to be restored.
4184 * They might participate in dependency chains connecting entries that
4185 * should be restored, so we treat them as live until we actually process
4186 * them.
4187 *
4188 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4189 * before DATA items, and all DATA items before POST_DATA items. That is
4190 * not certain to be true in older archives, though, and in any case use
4191 * of a list file would destroy that ordering (cf. SortTocFromFile). So
4192 * this loop cannot assume that it holds.
4193 */
4195 skipped_some = false;
4196 for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4197 {
4198 bool do_now = true;
4199
4200 if (next_work_item->section != SECTION_PRE_DATA)
4201 {
4202 /* DATA and POST_DATA items are just ignored for now */
4203 if (next_work_item->section == SECTION_DATA ||
4204 next_work_item->section == SECTION_POST_DATA)
4205 {
4206 do_now = false;
4207 skipped_some = true;
4208 }
4209 else
4210 {
4211 /*
4212 * SECTION_NONE items, such as comments, can be processed now
4213 * if we are still in the PRE_DATA part of the archive. Once
4214 * we've skipped any items, we have to consider whether the
4215 * comment's dependencies are satisfied, so skip it for now.
4216 */
4217 if (skipped_some)
4218 do_now = false;
4219 }
4220 }
4221
4222 /*
4223 * Also skip items that need to be forced into later passes. We need
4224 * not set skipped_some in this case, since by assumption no main-pass
4225 * items could depend on these.
4226 */
4227 if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4228 do_now = false;
4229
4230 if (do_now)
4231 {
4232 /* OK, restore the item and update its dependencies */
4233 pg_log_info("processing item %d %s %s",
4234 next_work_item->dumpId,
4235 next_work_item->desc, next_work_item->tag);
4236
4237 (void) restore_toc_entry(AH, next_work_item, false);
4238
4239 /* Reduce dependencies, but don't move anything to ready_heap */
4240 reduce_dependencies(AH, next_work_item, NULL);
4241 }
4242 else
4243 {
4244 /* Nope, so add it to pending_list */
4245 pending_list_append(pending_list, next_work_item);
4246 }
4247 }
4248
4249 /*
4250 * In --transaction-size mode, we must commit the open transaction before
4251 * dropping the database connection. This also ensures that child workers
4252 * can see the objects we've created so far.
4253 */
4254 if (AH->public.ropt->txn_size > 0)
4256
4257 /*
4258 * Now close parent connection in prep for parallel steps. We do this
4259 * mainly to ensure that we don't exceed the specified number of parallel
4260 * connections.
4261 */
4263
4264 /* blow away any transient state from the old connection */
4265 free(AH->currUser);
4266 AH->currUser = NULL;
4267 free(AH->currSchema);
4268 AH->currSchema = NULL;
4269 free(AH->currTablespace);
4270 AH->currTablespace = NULL;
4271 free(AH->currTableAm);
4272 AH->currTableAm = NULL;
4273}
4274
4275/*
4276 * Main engine for parallel restore.
4277 *
4278 * Parallel restore is done in three phases. In this second phase,
4279 * we process entries by dispatching them to parallel worker children
4280 * (processes on Unix, threads on Windows), each of which connects
4281 * separately to the database. Inter-entry dependencies are respected,
4282 * and so is the RestorePass multi-pass structure. When we can no longer
4283 * make any entries ready to process, we exit. Normally, there will be
4284 * nothing left to do; but if there is, the third phase will mop up.
4285 */
4286static void
4288 TocEntry *pending_list)
4289{
4290 binaryheap *ready_heap;
4291 TocEntry *next_work_item;
4292
4293 pg_log_debug("entering restore_toc_entries_parallel");
4294
4295 /* Set up ready_heap with enough room for all known TocEntrys */
4296 ready_heap = binaryheap_allocate(AH->tocCount,
4298 NULL);
4299
4300 /*
4301 * The pending_list contains all items that we need to restore. Move all
4302 * items that are available to process immediately into the ready_heap.
4303 * After this setup, the pending list is everything that needs to be done
4304 * but is blocked by one or more dependencies, while the ready heap
4305 * contains items that have no remaining dependencies and are OK to
4306 * process in the current restore pass.
4307 */
4309 move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4310
4311 /*
4312 * main parent loop
4313 *
4314 * Keep going until there is no worker still running AND there is no work
4315 * left to be done. Note invariant: at top of loop, there should always
4316 * be at least one worker available to dispatch a job to.
4317 */
4318 pg_log_info("entering main parallel loop");
4319
4320 for (;;)
4321 {
4322 /* Look for an item ready to be dispatched to a worker */
4323 next_work_item = pop_next_work_item(ready_heap, pstate);
4324 if (next_work_item != NULL)
4325 {
4326 /* If not to be restored, don't waste time launching a worker */
4327 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4328 {
4329 pg_log_info("skipping item %d %s %s",
4330 next_work_item->dumpId,
4331 next_work_item->desc, next_work_item->tag);
4332 /* Update its dependencies as though we'd completed it */
4333 reduce_dependencies(AH, next_work_item, ready_heap);
4334 /* Loop around to see if anything else can be dispatched */
4335 continue;
4336 }
4337
4338 pg_log_info("launching item %d %s %s",
4339 next_work_item->dumpId,
4340 next_work_item->desc, next_work_item->tag);
4341
4342 /* Dispatch to some worker */
4343 DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4344 mark_restore_job_done, ready_heap);
4345 }
4346 else if (IsEveryWorkerIdle(pstate))
4347 {
4348 /*
4349 * Nothing is ready and no worker is running, so we're done with
4350 * the current pass or maybe with the whole process.
4351 */
4352 if (AH->restorePass == RESTORE_PASS_LAST)
4353 break; /* No more parallel processing is possible */
4354
4355 /* Advance to next restore pass */
4356 AH->restorePass++;
4357 /* That probably allows some stuff to be made ready */
4358 move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4359 /* Loop around to see if anything's now ready */
4360 continue;
4361 }
4362 else
4363 {
4364 /*
4365 * We have nothing ready, but at least one child is working, so
4366 * wait for some subjob to finish.
4367 */
4368 }
4369
4370 /*
4371 * Before dispatching another job, check to see if anything has
4372 * finished. We should check every time through the loop so as to
4373 * reduce dependencies as soon as possible. If we were unable to
4374 * dispatch any job this time through, wait until some worker finishes
4375 * (and, hopefully, unblocks some pending item). If we did dispatch
4376 * something, continue as soon as there's at least one idle worker.
4377 * Note that in either case, there's guaranteed to be at least one
4378 * idle worker when we return to the top of the loop. This ensures we
4379 * won't block inside DispatchJobForTocEntry, which would be
4380 * undesirable: we'd rather postpone dispatching until we see what's
4381 * been unblocked by finished jobs.
4382 */
4383 WaitForWorkers(AH, pstate,
4384 next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4385 }
4386
4387 /* There should now be nothing in ready_heap. */
4388 Assert(binaryheap_empty(ready_heap));
4389
4390 binaryheap_free(ready_heap);
4391
4392 pg_log_info("finished main parallel loop");
4393}
4394
4395/*
4396 * Main engine for parallel restore.
4397 *
4398 * Parallel restore is done in three phases. In this third phase,
4399 * we mop up any remaining TOC entries by processing them serially.
4400 * This phase normally should have nothing to do, but if we've somehow
4401 * gotten stuck due to circular dependencies or some such, this provides
4402 * at least some chance of completing the restore successfully.
4403 */
4404static void
4406{
4407 RestoreOptions *ropt = AH->public.ropt;
4408 TocEntry *te;
4409
4410 pg_log_debug("entering restore_toc_entries_postfork");
4411
4412 /*
4413 * Now reconnect the single parent connection.
4414 */
4415 ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4416
4417 /* re-establish fixed state */
4419
4420 /*
4421 * Make sure there is no work left due to, say, circular dependencies, or
4422 * some other pathological condition. If so, do it in the single parent
4423 * connection. We don't sweat about RestorePass ordering; it's likely we
4424 * already violated that.
4425 */
4426 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4427 {
4428 pg_log_info("processing missed item %d %s %s",
4429 te->dumpId, te->desc, te->tag);
4430 (void) restore_toc_entry(AH, te, false);
4431 }
4432}
4433
4434/*
4435 * Check if te1 has an exclusive lock requirement for an item that te2 also
4436 * requires, whether or not te2's requirement is for an exclusive lock.
4437 */
4438static bool
4440{
4441 int j,
4442 k;
4443
4444 for (j = 0; j < te1->nLockDeps; j++)
4445 {
4446 for (k = 0; k < te2->nDeps; k++)
4447 {
4448 if (te1->lockDeps[j] == te2->dependencies[k])
4449 return true;
4450 }
4451 }
4452 return false;
4453}
4454
4455
4456/*
4457 * Initialize the header of the pending-items list.
4458 *
4459 * This is a circular list with a dummy TocEntry as header, just like the
4460 * main TOC list; but we use separate list links so that an entry can be in
4461 * the main TOC list as well as in the pending list.
4462 */
4463static void
4465{
4466 l->pending_prev = l->pending_next = l;
4467}
4468
4469/* Append te to the end of the pending-list headed by l */
4470static void
4472{
4473 te->pending_prev = l->pending_prev;
4474 l->pending_prev->pending_next = te;
4475 l->pending_prev = te;
4476 te->pending_next = l;
4477}
4478
4479/* Remove te from the pending-list */
4480static void
4482{
4485 te->pending_prev = NULL;
4486 te->pending_next = NULL;
4487}
4488
4489
4490/* qsort comparator for sorting TocEntries by dataLength */
4491static int
4492TocEntrySizeCompareQsort(const void *p1, const void *p2)
4493{
4494 const TocEntry *te1 = *(const TocEntry *const *) p1;
4495 const TocEntry *te2 = *(const TocEntry *const *) p2;
4496
4497 /* Sort by decreasing dataLength */
4498 if (te1->dataLength > te2->dataLength)
4499 return -1;
4500 if (te1->dataLength < te2->dataLength)
4501 return 1;
4502
4503 /* For equal dataLengths, sort by dumpId, just to be stable */
4504 if (te1->dumpId < te2->dumpId)
4505 return -1;
4506 if (te1->dumpId > te2->dumpId)
4507 return 1;
4508
4509 return 0;
4510}
4511
4512/* binaryheap comparator for sorting TocEntries by dataLength */
4513static int
4514TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4515{
4516 /* return opposite of qsort comparator for max-heap */
4517 return -TocEntrySizeCompareQsort(&p1, &p2);
4518}
4519
4520
4521/*
4522 * Move all immediately-ready items from pending_list to ready_heap.
4523 *
4524 * Items are considered ready if they have no remaining dependencies and
4525 * they belong in the current restore pass. (See also reduce_dependencies,
4526 * which applies the same logic one-at-a-time.)
4527 */
4528static void
4530 binaryheap *ready_heap,
4531 RestorePass pass)
4532{
4533 TocEntry *te;
4534 TocEntry *next_te;
4535
4536 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4537 {
4538 /* must save list link before possibly removing te from list */
4539 next_te = te->pending_next;
4540
4541 if (te->depCount == 0 &&
4542 _tocEntryRestorePass(te) == pass)
4543 {
4544 /* Remove it from pending_list ... */
4546 /* ... and add to ready_heap */
4547 binaryheap_add(ready_heap, te);
4548 }
4549 }
4550}
4551
4552/*
4553 * Find the next work item (if any) that is capable of being run now,
4554 * and remove it from the ready_heap.
4555 *
4556 * Returns the item, or NULL if nothing is runnable.
4557 *
4558 * To qualify, the item must have no remaining dependencies
4559 * and no requirements for locks that are incompatible with
4560 * items currently running. Items in the ready_heap are known to have
4561 * no remaining dependencies, but we have to check for lock conflicts.
4562 */
4563static TocEntry *
4565 ParallelState *pstate)
4566{
4567 /*
4568 * Search the ready_heap until we find a suitable item. Note that we do a
4569 * sequential scan through the heap nodes, so even though we will first
4570 * try to choose the highest-priority item, we might end up picking
4571 * something with a much lower priority. However, we expect that we will
4572 * typically be able to pick one of the first few items, which should
4573 * usually have a relatively high priority.
4574 */
4575 for (int i = 0; i < binaryheap_size(ready_heap); i++)
4576 {
4577 TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4578 bool conflicts = false;
4579
4580 /*
4581 * Check to see if the item would need exclusive lock on something
4582 * that a currently running item also needs lock on, or vice versa. If
4583 * so, we don't want to schedule them together.
4584 */
4585 for (int k = 0; k < pstate->numWorkers; k++)
4586 {
4587 TocEntry *running_te = pstate->te[k];
4588
4589 if (running_te == NULL)
4590 continue;
4591 if (has_lock_conflicts(te, running_te) ||
4592 has_lock_conflicts(running_te, te))
4593 {
4594 conflicts = true;
4595 break;
4596 }
4597 }
4598
4599 if (conflicts)
4600 continue;
4601
4602 /* passed all tests, so this item can run */
4603 binaryheap_remove_node(ready_heap, i);
4604 return te;
4605 }
4606
4607 pg_log_debug("no item ready");
4608 return NULL;
4609}
4610
4611
4612/*
4613 * Restore a single TOC item in parallel with others
4614 *
4615 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4616 * (everything else). A worker process executes several such work items during
4617 * a parallel backup or restore. Once we terminate here and report back that
4618 * our work is finished, the leader process will assign us a new work item.
4619 */
4620int
4622{
4623 int status;
4624
4625 Assert(AH->connection != NULL);
4626
4627 /* Count only errors associated with this TOC entry */
4628 AH->public.n_errors = 0;
4629
4630 /* Restore the TOC item */
4631 status = restore_toc_entry(AH, te, true);
4632
4633 return status;
4634}
4635
4636
4637/*
4638 * Callback function that's invoked in the leader process after a step has
4639 * been parallel restored.
4640 *
4641 * Update status and reduce the dependency count of any dependent items.
4642 */
4643static void
4645 TocEntry *te,
4646 int status,
4647 void *callback_data)
4648{
4649 binaryheap *ready_heap = (binaryheap *) callback_data;
4650
4651 pg_log_info("finished item %d %s %s",
4652 te->dumpId, te->desc, te->tag);
4653
4654 if (status == WORKER_CREATE_DONE)
4655 mark_create_done(AH, te);
4656 else if (status == WORKER_INHIBIT_DATA)
4657 {
4659 AH->public.n_errors++;
4660 }
4661 else if (status == WORKER_IGNORED_ERRORS)
4662 AH->public.n_errors++;
4663 else if (status != 0)
4664 pg_fatal("worker process failed: exit code %d",
4665 status);
4666
4667 reduce_dependencies(AH, te, ready_heap);
4668}
4669
4670
4671/*
4672 * Process the dependency information into a form useful for parallel restore.
4673 *
4674 * This function takes care of fixing up some missing or badly designed
4675 * dependencies, and then prepares subsidiary data structures that will be
4676 * used in the main parallel-restore logic, including:
4677 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4678 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4679 * dependencies for each TOC entry.
4680 *
4681 * We also identify locking dependencies so that we can avoid trying to
4682 * schedule conflicting items at the same time.
4683 */
4684static void
4686{
4687 TocEntry *te;
4688 int i;
4689
4690 /*
4691 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4692 * items are marked as not being in any parallel-processing list.
4693 */
4694 for (te = AH->toc->next; te != AH->toc; te = te->next)
4695 {
4696 te->depCount = te->nDeps;
4697 te->revDeps = NULL;
4698 te->nRevDeps = 0;
4699 te->pending_prev = NULL;
4700 te->pending_next = NULL;
4701 }
4702
4703 /*
4704 * POST_DATA items that are shown as depending on a table need to be
4705 * re-pointed to depend on that table's data, instead. This ensures they
4706 * won't get scheduled until the data has been loaded.
4707 */
4709
4710 /*
4711 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4712 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4713 * one BLOB COMMENTS in such files.)
4714 */
4715 if (AH->version < K_VERS_1_11)
4716 {
4717 for (te = AH->toc->next; te != AH->toc; te = te->next)
4718 {
4719 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4720 {
4721 TocEntry *te2;
4722
4723 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4724 {
4725 if (strcmp(te2->desc, "BLOBS") == 0)
4726 {
4727 te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4728 te->dependencies[0] = te2->dumpId;
4729 te->nDeps++;
4730 te->depCount++;
4731 break;
4732 }
4733 }
4734 break;
4735 }
4736 }
4737 }
4738
4739 /*
4740 * At this point we start to build the revDeps reverse-dependency arrays,
4741 * so all changes of dependencies must be complete.
4742 */
4743
4744 /*
4745 * Count the incoming dependencies for each item. Also, it is possible
4746 * that the dependencies list items that are not in the archive at all
4747 * (that should not happen in 9.2 and later, but is highly likely in older
4748 * archives). Subtract such items from the depCounts.
4749 */
4750 for (te = AH->toc->next; te != AH->toc; te = te->next)
4751 {
4752 for (i = 0; i < te->nDeps; i++)
4753 {
4754 DumpId depid = te->dependencies[i];
4755
4756 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4757 AH->tocsByDumpId[depid]->nRevDeps++;
4758 else
4759 te->depCount--;
4760 }
4761 }
4762
4763 /*
4764 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4765 * it as a counter below.
4766 */
4767 for (te = AH->toc->next; te != AH->toc; te = te->next)
4768 {
4769 if (te->nRevDeps > 0)
4770 te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4771 te->nRevDeps = 0;
4772 }
4773
4774 /*
4775 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4776 * better agree with the loops above.
4777 */
4778 for (te = AH->toc->next; te != AH->toc; te = te->next)
4779 {
4780 for (i = 0; i < te->nDeps; i++)
4781 {
4782 DumpId depid = te->dependencies[i];
4783
4784 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4785 {
4786 TocEntry *otherte = AH->tocsByDumpId[depid];
4787
4788 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4789 }
4790 }
4791 }
4792
4793 /*
4794 * Lastly, work out the locking dependencies.
4795 */
4796 for (te = AH->toc->next; te != AH->toc; te = te->next)
4797 {
4798 te->lockDeps = NULL;
4799 te->nLockDeps = 0;
4801 }
4802}
4803
4804/*
4805 * Change dependencies on table items to depend on table data items instead,
4806 * but only in POST_DATA items.
4807 *
4808 * Also, for any item having such dependency(s), set its dataLength to the
4809 * largest dataLength of the table data items it depends on. This ensures
4810 * that parallel restore will prioritize larger jobs (index builds, FK
4811 * constraint checks, etc) over smaller ones, avoiding situations where we
4812 * end a restore with only one active job working on a large table.
4813 */
4814static void
4816{
4817 TocEntry *te;
4818 int i;
4819 DumpId olddep;
4820
4821 for (te = AH->toc->next; te != AH->toc; te = te->next)
4822 {
4823 if (te->section != SECTION_POST_DATA)
4824 continue;
4825 for (i = 0; i < te->nDeps; i++)
4826 {
4827 olddep = te->dependencies[i];
4828 if (olddep <= AH->maxDumpId &&
4829 AH->tableDataId[olddep] != 0)
4830 {
4831 DumpId tabledataid = AH->tableDataId[olddep];
4832 TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4833
4834 te->dependencies[i] = tabledataid;
4835 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4836 pg_log_debug("transferring dependency %d -> %d to %d",
4837 te->dumpId, olddep, tabledataid);
4838 }
4839 }
4840 }
4841}
4842
4843/*
4844 * Identify which objects we'll need exclusive lock on in order to restore
4845 * the given TOC entry (*other* than the one identified by the TOC entry
4846 * itself). Record their dump IDs in the entry's lockDeps[] array.
4847 */
4848static void
4850{
4851 DumpId *lockids;
4852 int nlockids;
4853 int i;
4854
4855 /*
4856 * We only care about this for POST_DATA items. PRE_DATA items are not
4857 * run in parallel, and DATA items are all independent by assumption.
4858 */
4859 if (te->section != SECTION_POST_DATA)
4860 return;
4861
4862 /* Quick exit if no dependencies at all */
4863 if (te->nDeps == 0)
4864 return;
4865
4866 /*
4867 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4868 * and hence require exclusive lock. However, we know that CREATE INDEX
4869 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4870 * category too ... but today is not that day.)
4871 */
4872 if (strcmp(te->desc, "INDEX") == 0)
4873 return;
4874
4875 /*
4876 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4877 * item listed among its dependencies. Originally all of these would have
4878 * been TABLE items, but repoint_table_dependencies would have repointed
4879 * them to the TABLE DATA items if those are present (which they might not
4880 * be, eg in a schema-only dump). Note that all of the entries we are
4881 * processing here are POST_DATA; otherwise there might be a significant
4882 * difference between a dependency on a table and a dependency on its
4883 * data, so that closer analysis would be needed here.
4884 */
4885 lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4886 nlockids = 0;
4887 for (i = 0; i < te->nDeps; i++)
4888 {
4889 DumpId depid = te->dependencies[i];
4890
4891 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4892 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4893 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4894 lockids[nlockids++] = depid;
4895 }
4896
4897 if (nlockids == 0)
4898 {
4899 free(lockids);
4900 return;
4901 }
4902
4903 te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4904 te->nLockDeps = nlockids;
4905}
4906
4907/*
4908 * Remove the specified TOC entry from the depCounts of items that depend on
4909 * it, thereby possibly making them ready-to-run. Any pending item that
4910 * becomes ready should be moved to the ready_heap, if that's provided.
4911 */
4912static void
4914 binaryheap *ready_heap)
4915{
4916 int i;
4917
4918 pg_log_debug("reducing dependencies for %d", te->dumpId);
4919
4920 for (i = 0; i < te->nRevDeps; i++)
4921 {
4922 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4923
4924 Assert(otherte->depCount > 0);
4925 otherte->depCount--;
4926
4927 /*
4928 * It's ready if it has no remaining dependencies, and it belongs in
4929 * the current restore pass, and it is currently a member of the
4930 * pending list (that check is needed to prevent double restore in
4931 * some cases where a list-file forces out-of-order restoring).
4932 * However, if ready_heap == NULL then caller doesn't want any list
4933 * memberships changed.
4934 */
4935 if (otherte->depCount == 0 &&
4936 _tocEntryRestorePass(otherte) == AH->restorePass &&
4937 otherte->pending_prev != NULL &&
4938 ready_heap != NULL)
4939 {
4940 /* Remove it from pending list ... */
4941 pending_list_remove(otherte);
4942 /* ... and add to ready_heap */
4943 binaryheap_add(ready_heap, otherte);
4944 }
4945 }
4946}
4947
4948/*
4949 * Set the created flag on the DATA member corresponding to the given
4950 * TABLE member
4951 */
4952static void
4954{
4955 if (AH->tableDataId[te->dumpId] != 0)
4956 {
4957 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4958
4959 ted->created = true;
4960 }
4961}
4962
4963/*
4964 * Mark the DATA member corresponding to the given TABLE member
4965 * as not wanted
4966 */
4967static void
4969{
4970 pg_log_info("table \"%s\" could not be created, will not restore its data",
4971 te->tag);
4972
4973 if (AH->tableDataId[te->dumpId] != 0)
4974 {
4975 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4976
4977 ted->reqs = 0;
4978 }
4979}
4980
4981/*
4982 * Clone and de-clone routines used in parallel restoration.
4983 *
4984 * Enough of the structure is cloned to ensure that there is no
4985 * conflict between different threads each with their own clone.
4986 */
4989{
4990 ArchiveHandle *clone;
4991
4992 /* Make a "flat" copy */
4993 clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4994 memcpy(clone, AH, sizeof(ArchiveHandle));
4995
4996 /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
4997 clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
4998 memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
4999
5000 /* Handle format-independent fields */
5001 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5002
5003 /* The clone will have its own connection, so disregard connection state */
5004 clone->connection = NULL;
5005 clone->connCancel = NULL;
5006 clone->currUser = NULL;
5007 clone->currSchema = NULL;
5008 clone->currTableAm = NULL;
5009 clone->currTablespace = NULL;
5010
5011 /* savedPassword must be local in case we change it while connecting */
5012 if (clone->savedPassword)
5013 clone->savedPassword = pg_strdup(clone->savedPassword);
5014
5015 /* clone has its own error count, too */
5016 clone->public.n_errors = 0;
5017
5018 /* clones should not share lo_buf */
5019 clone->lo_buf = NULL;
5020
5021 /*
5022 * Clone connections disregard --transaction-size; they must commit after
5023 * each command so that the results are immediately visible to other
5024 * workers.
5025 */
5026 clone->public.ropt->txn_size = 0;
5027
5028 /*
5029 * Connect our new clone object to the database, using the same connection
5030 * parameters used for the original connection.
5031 */
5032 ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5033
5034 /* re-establish fixed state */
5035 if (AH->mode == archModeRead)
5037 /* in write case, setupDumpWorker will fix up connection state */
5038
5039 /* Let the format-specific code have a chance too */
5040 clone->ClonePtr(clone);
5041
5042 Assert(clone->connection != NULL);
5043 return clone;
5044}
5045
5046/*
5047 * Release clone-local storage.
5048 *
5049 * Note: we assume any clone-local connection was already closed.
5050 */
5051void
5053{
5054 /* Should not have an open database connection */
5055 Assert(AH->connection == NULL);
5056
5057 /* Clear format-specific state */
5058 AH->DeClonePtr(AH);
5059
5060 /* Clear state allocated by CloneArchive */
5061 if (AH->sqlparse.curCmd)
5063
5064 /* Clear any connection-local state */
5065 free(AH->currUser);
5066 free(AH->currSchema);
5067 free(AH->currTablespace);
5068 free(AH->currTableAm);
5069 free(AH->savedPassword);
5070
5071 free(AH);
5072}
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1061
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1453
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:899
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1207
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1270
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition: binaryheap.c:225
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:154
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
#define binaryheap_size(h)
Definition: binaryheap.h:66
#define binaryheap_empty(h)
Definition: binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition: binaryheap.h:67
#define PG_BINARY_R
Definition: c.h:1229
#define ngettext(s, p, n)
Definition: c.h:1135
#define PG_BINARY_A
Definition: c.h:1228
#define Max(x, y)
Definition: c.h:952
#define Assert(condition)
Definition: c.h:812
#define PG_BINARY_W
Definition: c.h:1230
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:288
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:87