PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_backup_archiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres_fe.h"
23
24#include <ctype.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#ifdef WIN32
30#include <io.h>
31#endif
32
33#include "catalog/pg_class_d.h"
34#include "catalog/pg_largeobject_metadata_d.h"
35#include "catalog/pg_shdepend_d.h"
36#include "common/string.h"
37#include "compress_io.h"
38#include "dumputils.h"
40#include "lib/binaryheap.h"
41#include "lib/stringinfo.h"
42#include "libpq/libpq-fs.h"
43#include "parallel.h"
44#include "pg_backup_archiver.h"
45#include "pg_backup_db.h"
46#include "pg_backup_utils.h"
47
48#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
49#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
50
51#define TOC_PREFIX_NONE ""
52#define TOC_PREFIX_DATA "Data for "
53#define TOC_PREFIX_STATS "Statistics for "
54
55static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
56 const pg_compress_specification compression_spec,
60static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
61static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
63static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
64static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
65static void _becomeUser(ArchiveHandle *AH, const char *user);
66static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
67static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
68static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
69static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
71 TocEntry *te);
72static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
77static bool _tocEntryIsACL(TocEntry *te);
81static void buildTocEntryArrays(ArchiveHandle *AH);
82static void _moveBefore(TocEntry *pos, TocEntry *te);
84
85static int RestoringToDB(ArchiveHandle *AH);
86static void dump_lo_buf(ArchiveHandle *AH);
87static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
88static void SetOutput(ArchiveHandle *AH, const char *filename,
89 const pg_compress_specification compression_spec, bool append_data);
92
93static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
97 ParallelState *pstate,
101static void pending_list_header_init(TocEntry *l);
102static void pending_list_append(TocEntry *l, TocEntry *te);
103static void pending_list_remove(TocEntry *te);
104static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
105static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
110 ParallelState *pstate);
111static void mark_dump_job_done(ArchiveHandle *AH,
112 TocEntry *te,
113 int status,
114 void *callback_data);
116 TocEntry *te,
117 int status,
118 void *callback_data);
119static void fix_dependencies(ArchiveHandle *AH);
123static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
125static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
127
128static void StrictNamesCheck(RestoreOptions *ropt);
129
130
131/*
132 * Allocate a new DumpOptions block containing all default values.
133 */
142
143/*
144 * Initialize a DumpOptions struct to all default values
145 */
146void
148{
149 memset(opts, 0, sizeof(DumpOptions));
150 /* set any fields that shouldn't default to zeroes */
151 opts->include_everything = true;
152 opts->cparams.promptPassword = TRI_DEFAULT;
153 opts->dumpSections = DUMP_UNSECTIONED;
154 opts->dumpSchema = true;
155 opts->dumpData = true;
156 opts->dumpStatistics = false;
157}
158
159/*
160 * Create a freshly allocated DumpOptions with options equivalent to those
161 * found in the given RestoreOptions.
162 */
165{
166 DumpOptions *dopt = NewDumpOptions();
167
168 /* this is the inverse of what's at the end of pg_dump.c's main() */
169 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
170 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
171 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
174 dopt->outputClean = ropt->dropSchema;
175 dopt->dumpData = ropt->dumpData;
176 dopt->dumpSchema = ropt->dumpSchema;
177 dopt->dumpSections = ropt->dumpSections;
178 dopt->dumpStatistics = ropt->dumpStatistics;
179 dopt->if_exists = ropt->if_exists;
180 dopt->column_inserts = ropt->column_inserts;
181 dopt->aclsSkip = ropt->aclsSkip;
182 dopt->outputSuperuser = ropt->superuser;
183 dopt->outputCreateDB = ropt->createDB;
184 dopt->outputNoOwner = ropt->noOwner;
185 dopt->outputNoTableAm = ropt->noTableAm;
186 dopt->outputNoTablespaces = ropt->noTablespace;
188 dopt->use_setsessauth = ropt->use_setsessauth;
190 dopt->dump_inserts = ropt->dump_inserts;
191 dopt->no_comments = ropt->no_comments;
192 dopt->no_policies = ropt->no_policies;
193 dopt->no_publications = ropt->no_publications;
196 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
199 dopt->sequence_data = ropt->sequence_data;
200 dopt->restrict_key = ropt->restrict_key ? pg_strdup(ropt->restrict_key) : NULL;
201
202 return dopt;
203}
204
205
206/*
207 * Wrapper functions.
208 *
209 * The objective is to make writing new formats and dumpers as simple
210 * as possible, if necessary at the expense of extra function calls etc.
211 *
212 */
213
214/*
215 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
216 * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
217 * setup doesn't need to know anything much, so it's defined here.
218 */
219static void
221{
222 ArchiveHandle *AH = (ArchiveHandle *) AHX;
223
224 AH->ReopenPtr(AH);
225}
226
227
228/* Create a new archive */
229/* Public */
230Archive *
232 const pg_compress_specification compression_spec,
233 bool dosync, ArchiveMode mode,
236
237{
238 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
240
241 return (Archive *) AH;
242}
243
244/* Open an existing archive */
245/* Public */
246Archive *
248{
249 ArchiveHandle *AH;
250 pg_compress_specification compression_spec = {0};
251
252 compression_spec.algorithm = PG_COMPRESSION_NONE;
253 AH = _allocAH(FileSpec, fmt, compression_spec, true,
256
257 return (Archive *) AH;
258}
259
260/* Public */
261void
263{
264 ArchiveHandle *AH = (ArchiveHandle *) AHX;
265
266 AH->ClosePtr(AH);
267
268 /* Close the output */
269 errno = 0;
270 if (!EndCompressFileHandle(AH->OF))
271 pg_fatal("could not close output file: %m");
272}
273
274/* Public */
275void
277{
278 /* Caller can omit dump options, in which case we synthesize them */
279 if (dopt == NULL && ropt != NULL)
281
282 /* Save options for later access */
283 AH->dopt = dopt;
284 AH->ropt = ropt;
285}
286
287/* Public */
288void
290{
291 ArchiveHandle *AH = (ArchiveHandle *) AHX;
292 RestoreOptions *ropt = AH->public.ropt;
293 TocEntry *te;
295
296 /* Decide which TOC entries will be dumped/restored, and mark them */
298 for (te = AH->toc->next; te != AH->toc; te = te->next)
299 {
300 /*
301 * When writing an archive, we also take this opportunity to check
302 * that we have generated the entries in a sane order that respects
303 * the section divisions. When reading, don't complain, since buggy
304 * old versions of pg_dump might generate out-of-order archives.
305 */
306 if (AH->mode != archModeRead)
307 {
308 switch (te->section)
309 {
310 case SECTION_NONE:
311 /* ok to be anywhere */
312 break;
313 case SECTION_PRE_DATA:
315 pg_log_warning("archive items not in correct section order");
316 break;
317 case SECTION_DATA:
319 pg_log_warning("archive items not in correct section order");
320 break;
322 /* ok no matter which section we were in */
323 break;
324 default:
325 pg_fatal("unexpected section code %d",
326 (int) te->section);
327 break;
328 }
329 }
330
331 if (te->section != SECTION_NONE)
332 curSection = te->section;
333
334 te->reqs = _tocEntryRequired(te, curSection, AH);
335 }
336
337 /* Enforce strict names checking */
338 if (ropt->strict_names)
339 StrictNamesCheck(ropt);
340}
341
342/*
343 * RestoreArchive
344 *
345 * If append_data is set, then append data into file as we are restoring dump
346 * of multiple databases which was taken by pg_dumpall.
347 */
348void
350{
351 ArchiveHandle *AH = (ArchiveHandle *) AHX;
352 RestoreOptions *ropt = AH->public.ropt;
353 bool parallel_mode;
354 TocEntry *te;
356
358
359 /*
360 * If we're going to do parallel restore, there are some restrictions.
361 */
362 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
363 if (parallel_mode)
364 {
365 /* We haven't got round to making this work for all archive formats */
366 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
367 pg_fatal("parallel restore is not supported with this archive file format");
368
369 /* Doesn't work if the archive represents dependencies as OIDs */
370 if (AH->version < K_VERS_1_8)
371 pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
372
373 /*
374 * It's also not gonna work if we can't reopen the input file, so
375 * let's try that immediately.
376 */
377 AH->ReopenPtr(AH);
378 }
379
380 /*
381 * Make sure we won't need (de)compression we haven't got
382 */
383 if (AH->PrintTocDataPtr != NULL)
384 {
385 for (te = AH->toc->next; te != AH->toc; te = te->next)
386 {
387 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
388 {
390
391 if (errmsg)
392 pg_fatal("cannot restore from compressed archive (%s)",
393 errmsg);
394 else
395 break;
396 }
397 }
398 }
399
400 /*
401 * Prepare index arrays, so we can assume we have them throughout restore.
402 * It's possible we already did this, though.
403 */
404 if (AH->tocsByDumpId == NULL)
406
407 /*
408 * If we're using a DB connection, then connect it.
409 */
410 if (ropt->useDB)
411 {
412 pg_log_info("connecting to database for restore");
413 if (AH->version < K_VERS_1_3)
414 pg_fatal("direct database connections are not supported in pre-1.3 archives");
415
416 /*
417 * We don't want to guess at whether the dump will successfully
418 * restore; allow the attempt regardless of the version of the restore
419 * target.
420 */
421 AHX->minRemoteVersion = 0;
422 AHX->maxRemoteVersion = 9999999;
423
424 ConnectDatabaseAhx(AHX, &ropt->cparams, false);
425
426 /*
427 * If we're talking to the DB directly, don't send comments since they
428 * obscure SQL when displaying errors
429 */
430 AH->noTocComments = 1;
431 }
432
433 /*
434 * Work out if we have an implied schema-less restore. This can happen if
435 * the dump excluded the schema or the user has used a toc list to exclude
436 * all of the schema data. All we do is look for schema entries - if none
437 * are found then we unset the dumpSchema flag.
438 *
439 * We could scan for wanted TABLE entries, but that is not the same as
440 * data-only. At this stage, it seems unnecessary (6-Mar-2001).
441 */
442 if (ropt->dumpSchema)
443 {
444 bool no_schema_found = true;
445
446 for (te = AH->toc->next; te != AH->toc; te = te->next)
447 {
448 if ((te->reqs & REQ_SCHEMA) != 0)
449 {
450 no_schema_found = false;
451 break;
452 }
453 }
454 if (no_schema_found)
455 {
456 ropt->dumpSchema = false;
457 pg_log_info("implied no-schema restore");
458 }
459 }
460
461 /*
462 * Setup the output file if necessary.
463 */
464 sav = SaveOutput(AH);
467
468 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
469
470 /*
471 * If generating plain-text output, enter restricted mode to block any
472 * unexpected psql meta-commands. A malicious source might try to inject
473 * a variety of things via bogus responses to queries. While we cannot
474 * prevent such sources from affecting the destination at restore time, we
475 * can block psql meta-commands so that the client machine that runs psql
476 * with the dump output remains unaffected.
477 */
478 if (ropt->restrict_key)
479 ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
480
481 if (AH->archiveRemoteVersion)
482 ahprintf(AH, "-- Dumped from database version %s\n",
484 if (AH->archiveDumpVersion)
485 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
487
488 ahprintf(AH, "\n");
489
490 if (AH->public.verbose)
491 dumpTimestamp(AH, "Started on", AH->createDate);
492
493 if (ropt->single_txn)
494 {
495 if (AH->connection)
496 StartTransaction(AHX);
497 else
498 ahprintf(AH, "BEGIN;\n\n");
499 }
500
501 /*
502 * Establish important parameter values right away.
503 */
505
507
508 /*
509 * Drop the items at the start, in reverse order
510 */
511 if (ropt->dropSchema)
512 {
513 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
514 {
515 AH->currentTE = te;
516
517 /*
518 * In createDB mode, issue a DROP *only* for the database as a
519 * whole. Issuing drops against anything else would be wrong,
520 * because at this point we're connected to the wrong database.
521 * (The DATABASE PROPERTIES entry, if any, should be treated like
522 * the DATABASE entry.)
523 */
524 if (ropt->createDB)
525 {
526 if (strcmp(te->desc, "DATABASE") != 0 &&
527 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
528 continue;
529 }
530
531 /* Otherwise, drop anything that's selected and has a dropStmt */
532 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
533 {
534 bool not_allowed_in_txn = false;
535
536 pg_log_info("dropping %s %s", te->desc, te->tag);
537
538 /*
539 * In --transaction-size mode, we have to temporarily exit our
540 * transaction block to drop objects that can't be dropped
541 * within a transaction.
542 */
543 if (ropt->txn_size > 0)
544 {
545 if (strcmp(te->desc, "DATABASE") == 0 ||
546 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
547 {
548 not_allowed_in_txn = true;
549 if (AH->connection)
551 else
552 ahprintf(AH, "COMMIT;\n");
553 }
554 }
555
556 /* Select owner and schema as necessary */
557 _becomeOwner(AH, te);
558 _selectOutputSchema(AH, te->namespace);
559
560 /*
561 * Now emit the DROP command, if the object has one. Note we
562 * don't necessarily emit it verbatim; at this point we add an
563 * appropriate IF EXISTS clause, if the user requested it.
564 */
565 if (strcmp(te->desc, "BLOB METADATA") == 0)
566 {
567 /* We must generate the per-blob commands */
568 if (ropt->if_exists)
569 IssueCommandPerBlob(AH, te,
570 "SELECT pg_catalog.lo_unlink(oid) "
571 "FROM pg_catalog.pg_largeobject_metadata "
572 "WHERE oid = '", "'");
573 else
574 IssueCommandPerBlob(AH, te,
575 "SELECT pg_catalog.lo_unlink('",
576 "')");
577 }
578 else if (*te->dropStmt != '\0')
579 {
580 if (!ropt->if_exists ||
581 strncmp(te->dropStmt, "--", 2) == 0)
582 {
583 /*
584 * Without --if-exists, or if it's just a comment (as
585 * happens for the public schema), print the dropStmt
586 * as-is.
587 */
588 ahprintf(AH, "%s", te->dropStmt);
589 }
590 else
591 {
592 /*
593 * Inject an appropriate spelling of "if exists". For
594 * old-style large objects, we have a routine that
595 * knows how to do it, without depending on
596 * te->dropStmt; use that. For other objects we need
597 * to parse the command.
598 */
599 if (strcmp(te->desc, "BLOB") == 0)
600 {
602 }
603 else
604 {
605 char *dropStmt = pg_strdup(te->dropStmt);
606 char *dropStmtOrig = dropStmt;
608
609 /*
610 * Need to inject IF EXISTS clause after ALTER
611 * TABLE part in ALTER TABLE .. DROP statement
612 */
613 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
614 {
616 "ALTER TABLE IF EXISTS");
617 dropStmt = dropStmt + 11;
618 }
619
620 /*
621 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
622 * not support the IF EXISTS clause, and therefore
623 * we simply emit the original command for DEFAULT
624 * objects (modulo the adjustment made above).
625 *
626 * Likewise, don't mess with DATABASE PROPERTIES.
627 *
628 * If we used CREATE OR REPLACE VIEW as a means of
629 * quasi-dropping an ON SELECT rule, that should
630 * be emitted unchanged as well.
631 *
632 * For other object types, we need to extract the
633 * first part of the DROP which includes the
634 * object type. Most of the time this matches
635 * te->desc, so search for that; however for the
636 * different kinds of CONSTRAINTs, we know to
637 * search for hardcoded "DROP CONSTRAINT" instead.
638 */
639 if (strcmp(te->desc, "DEFAULT") == 0 ||
640 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
641 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
642 appendPQExpBufferStr(ftStmt, dropStmt);
643 else
644 {
645 char buffer[40];
646 char *mark;
647
648 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
649 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
650 strcmp(te->desc, "FK CONSTRAINT") == 0)
651 strcpy(buffer, "DROP CONSTRAINT");
652 else
653 snprintf(buffer, sizeof(buffer), "DROP %s",
654 te->desc);
655
656 mark = strstr(dropStmt, buffer);
657
658 if (mark)
659 {
660 *mark = '\0';
661 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
662 dropStmt, buffer,
663 mark + strlen(buffer));
664 }
665 else
666 {
667 /* complain and emit unmodified command */
668 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
670 appendPQExpBufferStr(ftStmt, dropStmt);
671 }
672 }
673
674 ahprintf(AH, "%s", ftStmt->data);
675
678 }
679 }
680 }
681
682 /*
683 * In --transaction-size mode, re-establish the transaction
684 * block if needed; otherwise, commit after every N drops.
685 */
686 if (ropt->txn_size > 0)
687 {
689 {
690 if (AH->connection)
691 StartTransaction(AHX);
692 else
693 ahprintf(AH, "BEGIN;\n");
694 AH->txnCount = 0;
695 }
696 else if (++AH->txnCount >= ropt->txn_size)
697 {
698 if (AH->connection)
699 {
701 StartTransaction(AHX);
702 }
703 else
704 ahprintf(AH, "COMMIT;\nBEGIN;\n");
705 AH->txnCount = 0;
706 }
707 }
708 }
709 }
710
711 /*
712 * _selectOutputSchema may have set currSchema to reflect the effect
713 * of a "SET search_path" command it emitted. However, by now we may
714 * have dropped that schema; or it might not have existed in the first
715 * place. In either case the effective value of search_path will not
716 * be what we think. Forcibly reset currSchema so that we will
717 * re-establish the search_path setting when needed (after creating
718 * the schema).
719 *
720 * If we treated users as pg_dump'able objects then we'd need to reset
721 * currUser here too.
722 */
723 free(AH->currSchema);
724 AH->currSchema = NULL;
725 }
726
727 if (parallel_mode)
728 {
729 /*
730 * In parallel mode, turn control over to the parallel-restore logic.
731 */
732 ParallelState *pstate;
734
735 /* The archive format module may need some setup for this */
738
740
741 /* This runs PRE_DATA items and then disconnects from the database */
743 Assert(AH->connection == NULL);
744
745 /* ParallelBackupStart() will actually fork the processes */
746 pstate = ParallelBackupStart(AH);
748 ParallelBackupEnd(AH, pstate);
749
750 /* reconnect the leader and see if we missed something */
752 Assert(AH->connection != NULL);
753 }
754 else
755 {
756 /*
757 * In serial mode, process everything in three phases: normal items,
758 * then ACLs, then post-ACL items. We might be able to skip one or
759 * both extra phases in some cases, eg data-only restores.
760 */
761 bool haveACL = false;
762 bool havePostACL = false;
763
764 for (te = AH->toc->next; te != AH->toc; te = te->next)
765 {
766 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
767 continue; /* ignore if not to be dumped at all */
768
769 /* Skip if no-tablespace is given. */
770 if (ropt->noTablespace && te && te->desc &&
771 (strcmp(te->desc, "TABLESPACE") == 0))
772 continue;
773
774 /*
775 * Skip DROP DATABASE/ROLES/TABLESPACE if we didn't specify
776 * --clean
777 */
778 if (!ropt->dropSchema && te && te->desc &&
779 strcmp(te->desc, "DROP_GLOBAL") == 0)
780 continue;
781
782 switch (_tocEntryRestorePass(te))
783 {
785 (void) restore_toc_entry(AH, te, false);
786 break;
787 case RESTORE_PASS_ACL:
788 haveACL = true;
789 break;
791 havePostACL = true;
792 break;
793 }
794 }
795
796 if (haveACL)
797 {
798 for (te = AH->toc->next; te != AH->toc; te = te->next)
799 {
800 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
802 (void) restore_toc_entry(AH, te, false);
803 }
804 }
805
806 if (havePostACL)
807 {
808 for (te = AH->toc->next; te != AH->toc; te = te->next)
809 {
810 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
812 (void) restore_toc_entry(AH, te, false);
813 }
814 }
815 }
816
817 /*
818 * Close out any persistent transaction we may have. While these two
819 * cases are started in different places, we can end both cases here.
820 */
821 if (ropt->single_txn || ropt->txn_size > 0)
822 {
823 if (AH->connection)
825 else
826 ahprintf(AH, "COMMIT;\n\n");
827 }
828
829 if (AH->public.verbose)
830 dumpTimestamp(AH, "Completed on", time(NULL));
831
832 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
833
834 /*
835 * If generating plain-text output, exit restricted mode at the very end
836 * of the script. This is not pro forma; in particular, pg_dumpall
837 * requires this when transitioning from one database to another.
838 */
839 if (ropt->restrict_key)
840 ahprintf(AH, "\\unrestrict %s\n\n", ropt->restrict_key);
841
842 /*
843 * Clean up & we're done.
844 */
846
848 RestoreOutput(AH, sav);
849
850 if (ropt->useDB)
852}
853
854/*
855 * Restore a single TOC item. Used in both parallel and non-parallel restore;
856 * is_parallel is true if we are in a worker child process.
857 *
858 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
859 * the parallel parent has to make the corresponding status update.
860 */
861static int
863{
864 RestoreOptions *ropt = AH->public.ropt;
865 int status = WORKER_OK;
866 int reqs;
867 bool defnDumped;
868
869 AH->currentTE = te;
870
871 /* Dump any relevant dump warnings to stderr */
872 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
873 {
874 if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
875 pg_log_warning("warning from original dump file: %s", te->defn);
876 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
877 pg_log_warning("warning from original dump file: %s", te->copyStmt);
878 }
879
880 /* Work out what, if anything, we want from this entry */
881 reqs = te->reqs;
882
883 defnDumped = false;
884
885 /*
886 * If it has a schema component that we want, then process that
887 */
888 if ((reqs & REQ_SCHEMA) != 0)
889 {
890 bool object_is_db = false;
891
892 /*
893 * In --transaction-size mode, must exit our transaction block to
894 * create a database or set its properties.
895 */
896 if (strcmp(te->desc, "DATABASE") == 0 ||
897 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
898 {
899 object_is_db = true;
900 if (ropt->txn_size > 0)
901 {
902 if (AH->connection)
904 else
905 ahprintf(AH, "COMMIT;\n\n");
906 }
907 }
908
909 /* Show namespace in log message if available */
910 if (te->namespace)
911 pg_log_info("creating %s \"%s.%s\"",
912 te->desc, te->namespace, te->tag);
913 else
914 pg_log_info("creating %s \"%s\"",
915 te->desc, te->tag);
916
918 defnDumped = true;
919
920 if (strcmp(te->desc, "TABLE") == 0)
921 {
922 if (AH->lastErrorTE == te)
923 {
924 /*
925 * We failed to create the table. If
926 * --no-data-for-failed-tables was given, mark the
927 * corresponding TABLE DATA to be ignored.
928 *
929 * In the parallel case this must be done in the parent, so we
930 * just set the return value.
931 */
932 if (ropt->noDataForFailedTables)
933 {
934 if (is_parallel)
935 status = WORKER_INHIBIT_DATA;
936 else
938 }
939 }
940 else
941 {
942 /*
943 * We created the table successfully. Mark the corresponding
944 * TABLE DATA for possible truncation.
945 *
946 * In the parallel case this must be done in the parent, so we
947 * just set the return value.
948 */
949 if (is_parallel)
950 status = WORKER_CREATE_DONE;
951 else
952 mark_create_done(AH, te);
953 }
954 }
955
956 /*
957 * If we created a DB, connect to it. Also, if we changed DB
958 * properties, reconnect to ensure that relevant GUC settings are
959 * applied to our session. (That also restarts the transaction block
960 * in --transaction-size mode.)
961 */
962 if (object_is_db)
963 {
964 pg_log_info("connecting to new database \"%s\"", te->tag);
965 _reconnectToDB(AH, te->tag);
966 }
967 }
968
969 /*
970 * If it has a data component that we want, then process that
971 */
972 if ((reqs & REQ_DATA) != 0)
973 {
974 /*
975 * hadDumper will be set if there is genuine data component for this
976 * node. Otherwise, we need to check the defn field for statements
977 * that need to be executed in data-only restores.
978 */
979 if (te->hadDumper)
980 {
981 /*
982 * If we can output the data, then restore it.
983 */
984 if (AH->PrintTocDataPtr != NULL)
985 {
987
988 if (strcmp(te->desc, "BLOBS") == 0 ||
989 strcmp(te->desc, "BLOB COMMENTS") == 0)
990 {
991 pg_log_info("processing %s", te->desc);
992
993 _selectOutputSchema(AH, "pg_catalog");
994
995 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
996 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
998
999 AH->PrintTocDataPtr(AH, te);
1000
1002 }
1003 else
1004 {
1005 bool use_truncate;
1006
1008
1009 /* Select owner and schema as necessary */
1010 _becomeOwner(AH, te);
1011 _selectOutputSchema(AH, te->namespace);
1012
1013 pg_log_info("processing data for table \"%s.%s\"",
1014 te->namespace, te->tag);
1015
1016 /*
1017 * In parallel restore, if we created the table earlier in
1018 * this run (so that we know it is empty) and we are not
1019 * restoring a load-via-partition-root data item then we
1020 * wrap the COPY in a transaction and precede it with a
1021 * TRUNCATE. If wal_level is set to minimal this prevents
1022 * WAL-logging the COPY. This obtains a speedup similar
1023 * to that from using single_txn mode in non-parallel
1024 * restores.
1025 *
1026 * We mustn't do this for load-via-partition-root cases
1027 * because some data might get moved across partition
1028 * boundaries, risking deadlock and/or loss of previously
1029 * loaded data. (We assume that all partitions of a
1030 * partitioned table will be treated the same way.)
1031 */
1032 use_truncate = is_parallel && te->created &&
1034
1035 if (use_truncate)
1036 {
1037 /*
1038 * Parallel restore is always talking directly to a
1039 * server, so no need to see if we should issue BEGIN.
1040 */
1042
1043 /*
1044 * Issue TRUNCATE with ONLY so that child tables are
1045 * not wiped.
1046 */
1047 ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1048 fmtQualifiedId(te->namespace, te->tag));
1049 }
1050
1051 /*
1052 * If we have a copy statement, use it.
1053 */
1054 if (te->copyStmt && strlen(te->copyStmt) > 0)
1055 {
1056 ahprintf(AH, "%s", te->copyStmt);
1058 }
1059 else
1061
1062 AH->PrintTocDataPtr(AH, te);
1063
1064 /*
1065 * Terminate COPY if needed.
1066 */
1067 if (AH->outputKind == OUTPUT_COPYDATA &&
1068 RestoringToDB(AH))
1069 EndDBCopyMode(&AH->public, te->tag);
1071
1072 /* close out the transaction started above */
1073 if (use_truncate)
1075
1077 }
1078 }
1079 }
1080 else if (!defnDumped)
1081 {
1082 /* If we haven't already dumped the defn part, do so now */
1083 pg_log_info("executing %s %s", te->desc, te->tag);
1085 }
1086 }
1087
1088 /*
1089 * If it has a statistics component that we want, then process that
1090 */
1091 if ((reqs & REQ_STATS) != 0)
1093
1094 /*
1095 * If we emitted anything for this TOC entry, that counts as one action
1096 * against the transaction-size limit. Commit if it's time to.
1097 */
1098 if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1099 {
1100 if (++AH->txnCount >= ropt->txn_size)
1101 {
1102 if (AH->connection)
1103 {
1106 }
1107 else
1108 ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1109 AH->txnCount = 0;
1110 }
1111 }
1112
1113 if (AH->public.n_errors > 0 && status == WORKER_OK)
1114 status = WORKER_IGNORED_ERRORS;
1115
1116 return status;
1117}
1118
1119/*
1120 * Allocate a new RestoreOptions block.
1121 * This is mainly so we can initialize it, but also for future expansion,
1122 */
1125{
1127
1129
1130 /* set any fields that shouldn't default to zeroes */
1131 opts->format = archUnknown;
1132 opts->cparams.promptPassword = TRI_DEFAULT;
1133 opts->dumpSections = DUMP_UNSECTIONED;
1134 opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1135 opts->compression_spec.level = 0;
1136 opts->dumpSchema = true;
1137 opts->dumpData = true;
1138 opts->dumpStatistics = true;
1139
1140 return opts;
1141}
1142
1143static void
1145{
1146 RestoreOptions *ropt = AH->public.ropt;
1147
1148 /* This hack is only needed in a data-only restore */
1149 if (ropt->dumpSchema || !ropt->disable_triggers)
1150 return;
1151
1152 pg_log_info("disabling triggers for %s", te->tag);
1153
1154 /*
1155 * Become superuser if possible, since they are the only ones who can
1156 * disable constraint triggers. If -S was not given, assume the initial
1157 * user identity is a superuser. (XXX would it be better to become the
1158 * table owner?)
1159 */
1160 _becomeUser(AH, ropt->superuser);
1161
1162 /*
1163 * Disable them.
1164 */
1165 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1166 fmtQualifiedId(te->namespace, te->tag));
1167}
1168
1169static void
1171{
1172 RestoreOptions *ropt = AH->public.ropt;
1173
1174 /* This hack is only needed in a data-only restore */
1175 if (ropt->dumpSchema || !ropt->disable_triggers)
1176 return;
1177
1178 pg_log_info("enabling triggers for %s", te->tag);
1179
1180 /*
1181 * Become superuser if possible, since they are the only ones who can
1182 * disable constraint triggers. If -S was not given, assume the initial
1183 * user identity is a superuser. (XXX would it be better to become the
1184 * table owner?)
1185 */
1186 _becomeUser(AH, ropt->superuser);
1187
1188 /*
1189 * Enable them.
1190 */
1191 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1192 fmtQualifiedId(te->namespace, te->tag));
1193}
1194
1195/*
1196 * Detect whether a TABLE DATA TOC item is performing "load via partition
1197 * root", that is the target table is an ancestor partition rather than the
1198 * table the TOC item is nominally for.
1199 *
1200 * In newer archive files this can be detected by checking for a special
1201 * comment placed in te->defn. In older files we have to fall back to seeing
1202 * if the COPY statement targets the named table or some other one. This
1203 * will not work for data dumped as INSERT commands, so we could give a false
1204 * negative in that case; fortunately, that's a rarely-used option.
1205 */
1206static bool
1208{
1209 if (te->defn &&
1210 strncmp(te->defn, "-- load via partition root ", 27) == 0)
1211 return true;
1212 if (te->copyStmt && *te->copyStmt)
1213 {
1214 PQExpBuffer copyStmt = createPQExpBuffer();
1215 bool result;
1216
1217 /*
1218 * Build the initial part of the COPY as it would appear if the
1219 * nominal target table is the actual target. If we see anything
1220 * else, it must be a load-via-partition-root case.
1221 */
1222 appendPQExpBuffer(copyStmt, "COPY %s ",
1223 fmtQualifiedId(te->namespace, te->tag));
1224 result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1225 destroyPQExpBuffer(copyStmt);
1226 return result;
1227 }
1228 /* Assume it's not load-via-partition-root */
1229 return false;
1230}
1231
1232/*
1233 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1234 */
1235
1236/* Public */
1237void
1238WriteData(Archive *AHX, const void *data, size_t dLen)
1239{
1240 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1241
1242 if (!AH->currToc)
1243 pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1244
1245 AH->WriteDataPtr(AH, data, dLen);
1246}
1247
1248/*
1249 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1250 * repository for all metadata. But the name has stuck.
1251 *
1252 * The new entry is added to the Archive's TOC list. Most callers can ignore
1253 * the result value because nothing else need be done, but a few want to
1254 * manipulate the TOC entry further.
1255 */
1256
1257/* Public */
1258TocEntry *
1259ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1261{
1262 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1264
1266
1267 AH->tocCount++;
1268 if (dumpId > AH->maxDumpId)
1269 AH->maxDumpId = dumpId;
1270
1271 newToc->prev = AH->toc->prev;
1272 newToc->next = AH->toc;
1273 AH->toc->prev->next = newToc;
1274 AH->toc->prev = newToc;
1275
1276 newToc->catalogId = catalogId;
1277 newToc->dumpId = dumpId;
1278 newToc->section = opts->section;
1279
1280 newToc->tag = pg_strdup(opts->tag);
1281 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1282 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1283 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1284 newToc->relkind = opts->relkind;
1285 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1286 newToc->desc = pg_strdup(opts->description);
1287 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1288 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1289 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1290
1291 if (opts->nDeps > 0)
1292 {
1293 newToc->dependencies = pg_malloc_array(DumpId, opts->nDeps);
1294 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1295 newToc->nDeps = opts->nDeps;
1296 }
1297 else
1298 {
1299 newToc->dependencies = NULL;
1300 newToc->nDeps = 0;
1301 }
1302
1303 newToc->dataDumper = opts->dumpFn;
1304 newToc->dataDumperArg = opts->dumpArg;
1305 newToc->hadDumper = opts->dumpFn ? true : false;
1306
1307 newToc->defnDumper = opts->defnFn;
1308 newToc->defnDumperArg = opts->defnArg;
1309
1310 newToc->formatData = NULL;
1311 newToc->dataLength = 0;
1312
1313 if (AH->ArchiveEntryPtr != NULL)
1314 AH->ArchiveEntryPtr(AH, newToc);
1315
1316 return newToc;
1317}
1318
1319/* Public */
1320void
1322{
1323 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1324 RestoreOptions *ropt = AH->public.ropt;
1325 TocEntry *te;
1329 const char *fmtName;
1330 char stamp_str[64];
1331
1332 /* TOC is always uncompressed */
1334
1335 sav = SaveOutput(AH);
1336 if (ropt->filename)
1337 SetOutput(AH, ropt->filename, out_compression_spec, false);
1338
1340 localtime(&AH->createDate)) == 0)
1341 strcpy(stamp_str, "[unknown]");
1342
1343 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1344 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1345 sanitize_line(AH->archdbname, false),
1346 AH->tocCount,
1348
1349 switch (AH->format)
1350 {
1351 case archCustom:
1352 fmtName = "CUSTOM";
1353 break;
1354 case archDirectory:
1355 fmtName = "DIRECTORY";
1356 break;
1357 case archTar:
1358 fmtName = "TAR";
1359 break;
1360 default:
1361 fmtName = "UNKNOWN";
1362 }
1363
1364 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1366 ahprintf(AH, "; Format: %s\n", fmtName);
1367 ahprintf(AH, "; Integer: %zu bytes\n", AH->intSize);
1368 ahprintf(AH, "; Offset: %zu bytes\n", AH->offSize);
1369 if (AH->archiveRemoteVersion)
1370 ahprintf(AH, "; Dumped from database version: %s\n",
1372 if (AH->archiveDumpVersion)
1373 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1374 AH->archiveDumpVersion);
1375
1376 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1377
1379 for (te = AH->toc->next; te != AH->toc; te = te->next)
1380 {
1381 /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1382 if (te->section != SECTION_NONE)
1383 curSection = te->section;
1384 te->reqs = _tocEntryRequired(te, curSection, AH);
1385 /* Now, should we print it? */
1386 if (ropt->verbose ||
1387 (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1388 {
1389 char *sanitized_name;
1390 char *sanitized_schema;
1391 char *sanitized_owner;
1392
1393 /*
1394 */
1395 sanitized_name = sanitize_line(te->tag, false);
1396 sanitized_schema = sanitize_line(te->namespace, true);
1397 sanitized_owner = sanitize_line(te->owner, false);
1398
1399 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1403
1407 }
1408 if (ropt->verbose && te->nDeps > 0)
1409 {
1410 int i;
1411
1412 ahprintf(AH, ";\tdepends on:");
1413 for (i = 0; i < te->nDeps; i++)
1414 ahprintf(AH, " %d", te->dependencies[i]);
1415 ahprintf(AH, "\n");
1416 }
1417 }
1418
1419 /* Enforce strict names checking */
1420 if (ropt->strict_names)
1421 StrictNamesCheck(ropt);
1422
1423 if (ropt->filename)
1424 RestoreOutput(AH, sav);
1425}
1426
1427/***********
1428 * Large Object Archival
1429 ***********/
1430
1431/* Called by a dumper to signal start of a LO */
1432int
1434{
1435 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1436
1437 if (!AH->StartLOPtr)
1438 pg_fatal("large-object output not supported in chosen format");
1439
1440 AH->StartLOPtr(AH, AH->currToc, oid);
1441
1442 return 1;
1443}
1444
1445/* Called by a dumper to signal end of a LO */
1446int
1448{
1449 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1450
1451 if (AH->EndLOPtr)
1452 AH->EndLOPtr(AH, AH->currToc, oid);
1453
1454 return 1;
1455}
1456
1457/**********
1458 * Large Object Restoration
1459 **********/
1460
1461/*
1462 * Called by a format handler before a group of LOs is restored
1463 */
1464void
1466{
1467 RestoreOptions *ropt = AH->public.ropt;
1468
1469 /*
1470 * LOs must be restored within a transaction block, since we need the LO
1471 * handle to stay open while we write it. Establish a transaction unless
1472 * there's one being used globally.
1473 */
1474 if (!(ropt->single_txn || ropt->txn_size > 0))
1475 {
1476 if (AH->connection)
1478 else
1479 ahprintf(AH, "BEGIN;\n\n");
1480 }
1481
1482 AH->loCount = 0;
1483}
1484
1485/*
1486 * Called by a format handler after a group of LOs is restored
1487 */
1488void
1490{
1491 RestoreOptions *ropt = AH->public.ropt;
1492
1493 if (!(ropt->single_txn || ropt->txn_size > 0))
1494 {
1495 if (AH->connection)
1497 else
1498 ahprintf(AH, "COMMIT;\n\n");
1499 }
1500
1501 pg_log_info(ngettext("restored %d large object",
1502 "restored %d large objects",
1503 AH->loCount),
1504 AH->loCount);
1505}
1506
1507
1508/*
1509 * Called by a format handler to initiate restoration of a LO
1510 */
1511void
1513{
1514 bool old_lo_style = (AH->version < K_VERS_1_12);
1515 Oid loOid;
1516
1517 AH->loCount++;
1518
1519 /* Initialize the LO Buffer */
1520 if (AH->lo_buf == NULL)
1521 {
1522 /* First time through (in this process) so allocate the buffer */
1523 AH->lo_buf_size = LOBBUFSIZE;
1525 }
1526 AH->lo_buf_used = 0;
1527
1528 pg_log_info("restoring large object with OID %u", oid);
1529
1530 /* With an old archive we must do drop and create logic here */
1531 if (old_lo_style && drop)
1532 DropLOIfExists(AH, oid);
1533
1534 if (AH->connection)
1535 {
1536 if (old_lo_style)
1537 {
1538 loOid = lo_create(AH->connection, oid);
1539 if (loOid == 0 || loOid != oid)
1540 pg_fatal("could not create large object %u: %s",
1541 oid, PQerrorMessage(AH->connection));
1542 }
1543 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1544 if (AH->loFd == -1)
1545 pg_fatal("could not open large object %u: %s",
1546 oid, PQerrorMessage(AH->connection));
1547 }
1548 else
1549 {
1550 if (old_lo_style)
1551 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1552 oid, INV_WRITE);
1553 else
1554 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1555 oid, INV_WRITE);
1556 }
1557
1558 AH->writingLO = true;
1559}
1560
1561void
1563{
1564 if (AH->lo_buf_used > 0)
1565 {
1566 /* Write remaining bytes from the LO buffer */
1567 dump_lo_buf(AH);
1568 }
1569
1570 AH->writingLO = false;
1571
1572 if (AH->connection)
1573 {
1574 lo_close(AH->connection, AH->loFd);
1575 AH->loFd = -1;
1576 }
1577 else
1578 {
1579 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1580 }
1581}
1582
1583/***********
1584 * Sorting and Reordering
1585 ***********/
1586
1587void
1589{
1590 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1591 RestoreOptions *ropt = AH->public.ropt;
1592 FILE *fh;
1594
1595 /* Allocate space for the 'wanted' array, and init it */
1596 ropt->idWanted = pg_malloc0_array(bool, AH->maxDumpId);
1597
1598 /* Setup the file */
1599 fh = fopen(ropt->tocFile, PG_BINARY_R);
1600 if (!fh)
1601 pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1602
1604
1605 while (pg_get_line_buf(fh, &linebuf))
1606 {
1607 char *cmnt;
1608 char *endptr;
1609 DumpId id;
1610 TocEntry *te;
1611
1612 /* Truncate line at comment, if any */
1613 cmnt = strchr(linebuf.data, ';');
1614 if (cmnt != NULL)
1615 {
1616 cmnt[0] = '\0';
1617 linebuf.len = cmnt - linebuf.data;
1618 }
1619
1620 /* Ignore if all blank */
1621 if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1622 continue;
1623
1624 /* Get an ID, check it's valid and not already seen */
1625 id = strtol(linebuf.data, &endptr, 10);
1626 if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1627 ropt->idWanted[id - 1])
1628 {
1629 pg_log_warning("line ignored: %s", linebuf.data);
1630 continue;
1631 }
1632
1633 /* Find TOC entry */
1634 te = getTocEntryByDumpId(AH, id);
1635 if (!te)
1636 pg_fatal("could not find entry for ID %d",
1637 id);
1638
1639 /* Mark it wanted */
1640 ropt->idWanted[id - 1] = true;
1641
1642 /*
1643 * Move each item to the end of the list as it is selected, so that
1644 * they are placed in the desired order. Any unwanted items will end
1645 * up at the front of the list, which may seem unintuitive but it's
1646 * what we need. In an ordinary serial restore that makes no
1647 * difference, but in a parallel restore we need to mark unrestored
1648 * items' dependencies as satisfied before we start examining
1649 * restorable items. Otherwise they could have surprising
1650 * side-effects on the order in which restorable items actually get
1651 * restored.
1652 */
1653 _moveBefore(AH->toc, te);
1654 }
1655
1656 pg_free(linebuf.data);
1657
1658 if (fclose(fh) != 0)
1659 pg_fatal("could not close TOC file: %m");
1660}
1661
1662/**********************
1663 * Convenience functions that look like standard IO functions
1664 * for writing data when in dump mode.
1665 **********************/
1666
1667/* Public */
1668void
1669archputs(const char *s, Archive *AH)
1670{
1671 WriteData(AH, s, strlen(s));
1672}
1673
1674/* Public */
1675int
1676archprintf(Archive *AH, const char *fmt,...)
1677{
1678 int save_errno = errno;
1679 char *p;
1680 size_t len = 128; /* initial assumption about buffer size */
1681 size_t cnt;
1682
1683 for (;;)
1684 {
1685 va_list args;
1686
1687 /* Allocate work buffer. */
1688 p = (char *) pg_malloc(len);
1689
1690 /* Try to format the data. */
1691 errno = save_errno;
1692 va_start(args, fmt);
1693 cnt = pvsnprintf(p, len, fmt, args);
1694 va_end(args);
1695
1696 if (cnt < len)
1697 break; /* success */
1698
1699 /* Release buffer and loop around to try again with larger len. */
1700 free(p);
1701 len = cnt;
1702 }
1703
1704 WriteData(AH, p, cnt);
1705 free(p);
1706 return (int) cnt;
1707}
1708
1709
1710/*******************************
1711 * Stuff below here should be 'private' to the archiver routines
1712 *
1713 * If append_data is set, then append data into file as we are restoring dump
1714 * of multiple databases which was taken by pg_dumpall.
1715 *******************************/
1716
1717static void
1719 const pg_compress_specification compression_spec,
1720 bool append_data)
1721{
1723 const char *mode;
1724 int fn = -1;
1725
1726 if (filename)
1727 {
1728 if (strcmp(filename, "-") == 0)
1729 fn = fileno(stdout);
1730 }
1731 else if (AH->FH)
1732 fn = fileno(AH->FH);
1733 else if (AH->fSpec)
1734 {
1735 filename = AH->fSpec;
1736 }
1737 else
1738 fn = fileno(stdout);
1739
1740 if (append_data || AH->mode == archModeAppend)
1741 mode = PG_BINARY_A;
1742 else
1743 mode = PG_BINARY_W;
1744
1745 CFH = InitCompressFileHandle(compression_spec);
1746
1747 if (!CFH->open_func(filename, fn, mode, CFH))
1748 {
1749 if (filename)
1750 pg_fatal("could not open output file \"%s\": %m", filename);
1751 else
1752 pg_fatal("could not open output file: %m");
1753 }
1754
1755 AH->OF = CFH;
1756}
1757
1758static CompressFileHandle *
1760{
1761 return (CompressFileHandle *) AH->OF;
1762}
1763
1764static void
1766{
1767 errno = 0;
1768 if (!EndCompressFileHandle(AH->OF))
1769 pg_fatal("could not close output file: %m");
1770
1771 AH->OF = savedOutput;
1772}
1773
1774
1775
1776/*
1777 * Print formatted text to the output file (usually stdout).
1778 */
1779int
1780ahprintf(ArchiveHandle *AH, const char *fmt,...)
1781{
1782 int save_errno = errno;
1783 char *p;
1784 size_t len = 128; /* initial assumption about buffer size */
1785 size_t cnt;
1786
1787 for (;;)
1788 {
1789 va_list args;
1790
1791 /* Allocate work buffer. */
1792 p = (char *) pg_malloc(len);
1793
1794 /* Try to format the data. */
1795 errno = save_errno;
1796 va_start(args, fmt);
1797 cnt = pvsnprintf(p, len, fmt, args);
1798 va_end(args);
1799
1800 if (cnt < len)
1801 break; /* success */
1802
1803 /* Release buffer and loop around to try again with larger len. */
1804 free(p);
1805 len = cnt;
1806 }
1807
1808 ahwrite(p, 1, cnt, AH);
1809 free(p);
1810 return (int) cnt;
1811}
1812
1813/*
1814 * Single place for logic which says 'We are restoring to a direct DB connection'.
1815 */
1816static int
1818{
1819 RestoreOptions *ropt = AH->public.ropt;
1820
1821 return (ropt && ropt->useDB && AH->connection);
1822}
1823
1824/*
1825 * Dump the current contents of the LO data buffer while writing a LO
1826 */
1827static void
1829{
1830 if (AH->connection)
1831 {
1832 int res;
1833
1834 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1835 pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1836 "wrote %zu bytes of large object data (result = %d)",
1837 AH->lo_buf_used),
1838 AH->lo_buf_used, res);
1839 /* We assume there are no short writes, only errors */
1840 if (res != AH->lo_buf_used)
1841 warn_or_exit_horribly(AH, "could not write to large object: %s",
1843 }
1844 else
1845 {
1847
1849 (const unsigned char *) AH->lo_buf,
1850 AH->lo_buf_used,
1851 AH);
1852
1853 /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1854 AH->writingLO = false;
1855 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1856 AH->writingLO = true;
1857
1859 }
1860 AH->lo_buf_used = 0;
1861}
1862
1863
1864/*
1865 * Write buffer to the output file (usually stdout). This is used for
1866 * outputting 'restore' scripts etc. It is even possible for an archive
1867 * format to create a custom output routine to 'fake' a restore if it
1868 * wants to generate a script (see TAR output).
1869 */
1870void
1871ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1872{
1873 int bytes_written = 0;
1874
1875 if (AH->writingLO)
1876 {
1877 size_t remaining = size * nmemb;
1878
1879 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1880 {
1881 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1882
1883 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1884 ptr = (const char *) ptr + avail;
1885 remaining -= avail;
1886 AH->lo_buf_used += avail;
1887 dump_lo_buf(AH);
1888 }
1889
1890 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1891 AH->lo_buf_used += remaining;
1892
1893 bytes_written = size * nmemb;
1894 }
1895 else if (AH->CustomOutPtr)
1896 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1897
1898 /*
1899 * If we're doing a restore, and it's direct to DB, and we're connected
1900 * then send it to the DB.
1901 */
1902 else if (RestoringToDB(AH))
1903 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1904 else
1905 {
1907
1908 CFH->write_func(ptr, size * nmemb, CFH);
1909 bytes_written = size * nmemb;
1910 }
1911
1912 if (bytes_written != size * nmemb)
1914}
1915
1916/* on some error, we may decide to go on... */
1917void
1919{
1920 va_list ap;
1921
1922 switch (AH->stage)
1923 {
1924
1925 case STAGE_NONE:
1926 /* Do nothing special */
1927 break;
1928
1929 case STAGE_INITIALIZING:
1930 if (AH->stage != AH->lastErrorStage)
1931 pg_log_info("while INITIALIZING:");
1932 break;
1933
1934 case STAGE_PROCESSING:
1935 if (AH->stage != AH->lastErrorStage)
1936 pg_log_info("while PROCESSING TOC:");
1937 break;
1938
1939 case STAGE_FINALIZING:
1940 if (AH->stage != AH->lastErrorStage)
1941 pg_log_info("while FINALIZING:");
1942 break;
1943 }
1944 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1945 {
1946 pg_log_info("from TOC entry %d; %u %u %s %s %s",
1947 AH->currentTE->dumpId,
1949 AH->currentTE->catalogId.oid,
1950 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1951 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1952 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1953 }
1954 AH->lastErrorStage = AH->stage;
1955 AH->lastErrorTE = AH->currentTE;
1956
1957 va_start(ap, fmt);
1959 va_end(ap);
1960
1961 if (AH->public.exit_on_error)
1962 exit_nicely(1);
1963 else
1964 AH->public.n_errors++;
1965}
1966
1967#ifdef NOT_USED
1968
1969static void
1971{
1972 /* Unlink te from list */
1973 te->prev->next = te->next;
1974 te->next->prev = te->prev;
1975
1976 /* and insert it after "pos" */
1977 te->prev = pos;
1978 te->next = pos->next;
1979 pos->next->prev = te;
1980 pos->next = te;
1981}
1982#endif
1983
1984static void
1986{
1987 /* Unlink te from list */
1988 te->prev->next = te->next;
1989 te->next->prev = te->prev;
1990
1991 /* and insert it before "pos" */
1992 te->prev = pos->prev;
1993 te->next = pos;
1994 pos->prev->next = te;
1995 pos->prev = te;
1996}
1997
1998/*
1999 * Build index arrays for the TOC list
2000 *
2001 * This should be invoked only after we have created or read in all the TOC
2002 * items.
2003 *
2004 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
2005 * array entries run only up to maxDumpId. We might see dependency dump IDs
2006 * beyond that (if the dump was partial); so always check the array bound
2007 * before trying to touch an array entry.
2008 */
2009static void
2011{
2012 DumpId maxDumpId = AH->maxDumpId;
2013 TocEntry *te;
2014
2015 AH->tocsByDumpId = pg_malloc0_array(TocEntry *, (maxDumpId + 1));
2016 AH->tableDataId = pg_malloc0_array(DumpId, (maxDumpId + 1));
2017
2018 for (te = AH->toc->next; te != AH->toc; te = te->next)
2019 {
2020 /* this check is purely paranoia, maxDumpId should be correct */
2021 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
2022 pg_fatal("bad dumpId");
2023
2024 /* tocsByDumpId indexes all TOCs by their dump ID */
2025 AH->tocsByDumpId[te->dumpId] = te;
2026
2027 /*
2028 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
2029 * TOC entry that has a DATA item. We compute this by reversing the
2030 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
2031 * just one dependency and it is the TABLE item.
2032 */
2033 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
2034 {
2035 DumpId tableId = te->dependencies[0];
2036
2037 /*
2038 * The TABLE item might not have been in the archive, if this was
2039 * a data-only dump; but its dump ID should be less than its data
2040 * item's dump ID, so there should be a place for it in the array.
2041 */
2042 if (tableId <= 0 || tableId > maxDumpId)
2043 pg_fatal("bad table dumpId for TABLE DATA item");
2044
2045 AH->tableDataId[tableId] = te->dumpId;
2046 }
2047 }
2048}
2049
2050TocEntry *
2052{
2053 /* build index arrays if we didn't already */
2054 if (AH->tocsByDumpId == NULL)
2056
2057 if (id > 0 && id <= AH->maxDumpId)
2058 return AH->tocsByDumpId[id];
2059
2060 return NULL;
2061}
2062
2063int
2065{
2066 TocEntry *te = getTocEntryByDumpId(AH, id);
2067
2068 if (!te)
2069 return 0;
2070
2071 return te->reqs;
2072}
2073
2074size_t
2076{
2077 int off;
2078
2079 /* Save the flag */
2080 AH->WriteBytePtr(AH, wasSet);
2081
2082 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2083 for (off = 0; off < sizeof(pgoff_t); off++)
2084 {
2085 AH->WriteBytePtr(AH, o & 0xFF);
2086 o >>= 8;
2087 }
2088 return sizeof(pgoff_t) + 1;
2089}
2090
2091int
2093{
2094 int i;
2095 int off;
2096 int offsetFlg;
2097
2098 /* Initialize to zero */
2099 *o = 0;
2100
2101 /* Check for old version */
2102 if (AH->version < K_VERS_1_7)
2103 {
2104 /* Prior versions wrote offsets using WriteInt */
2105 i = ReadInt(AH);
2106 /* -1 means not set */
2107 if (i < 0)
2108 return K_OFFSET_POS_NOT_SET;
2109 else if (i == 0)
2110 return K_OFFSET_NO_DATA;
2111
2112 /* Cast to pgoff_t because it was written as an int. */
2113 *o = (pgoff_t) i;
2114 return K_OFFSET_POS_SET;
2115 }
2116
2117 /*
2118 * Read the flag indicating the state of the data pointer. Check if valid
2119 * and die if not.
2120 *
2121 * This used to be handled by a negative or zero pointer, now we use an
2122 * extra byte specifically for the state.
2123 */
2124 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2125
2126 switch (offsetFlg)
2127 {
2129 case K_OFFSET_NO_DATA:
2130 case K_OFFSET_POS_SET:
2131
2132 break;
2133
2134 default:
2135 pg_fatal("unexpected data offset flag %d", offsetFlg);
2136 }
2137
2138 /*
2139 * Read the bytes
2140 */
2141 for (off = 0; off < AH->offSize; off++)
2142 {
2143 if (off < sizeof(pgoff_t))
2144 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2145 else
2146 {
2147 if (AH->ReadBytePtr(AH) != 0)
2148 pg_fatal("file offset in dump file is too large");
2149 }
2150 }
2151
2152 return offsetFlg;
2153}
2154
2155size_t
2157{
2158 int b;
2159
2160 /*
2161 * This is a bit yucky, but I don't want to make the binary format very
2162 * dependent on representation, and not knowing much about it, I write out
2163 * a sign byte. If you change this, don't forget to change the file
2164 * version #, and modify ReadInt to read the new format AS WELL AS the old
2165 * formats.
2166 */
2167
2168 /* SIGN byte */
2169 if (i < 0)
2170 {
2171 AH->WriteBytePtr(AH, 1);
2172 i = -i;
2173 }
2174 else
2175 AH->WriteBytePtr(AH, 0);
2176
2177 for (b = 0; b < AH->intSize; b++)
2178 {
2179 AH->WriteBytePtr(AH, i & 0xFF);
2180 i >>= 8;
2181 }
2182
2183 return AH->intSize + 1;
2184}
2185
2186int
2188{
2189 int res = 0;
2190 int bv,
2191 b;
2192 int sign = 0; /* Default positive */
2193 int bitShift = 0;
2194
2195 if (AH->version > K_VERS_1_0)
2196 /* Read a sign byte */
2197 sign = AH->ReadBytePtr(AH);
2198
2199 for (b = 0; b < AH->intSize; b++)
2200 {
2201 bv = AH->ReadBytePtr(AH) & 0xFF;
2202 if (bv != 0)
2203 res = res + (bv << bitShift);
2204 bitShift += 8;
2205 }
2206
2207 if (sign)
2208 res = -res;
2209
2210 return res;
2211}
2212
2213size_t
2214WriteStr(ArchiveHandle *AH, const char *c)
2215{
2216 size_t res;
2217
2218 if (c)
2219 {
2220 int len = strlen(c);
2221
2222 res = WriteInt(AH, len);
2223 AH->WriteBufPtr(AH, c, len);
2224 res += len;
2225 }
2226 else
2227 res = WriteInt(AH, -1);
2228
2229 return res;
2230}
2231
2232char *
2234{
2235 char *buf;
2236 int l;
2237
2238 l = ReadInt(AH);
2239 if (l < 0)
2240 buf = NULL;
2241 else
2242 {
2243 buf = (char *) pg_malloc(l + 1);
2244 AH->ReadBufPtr(AH, buf, l);
2245
2246 buf[l] = '\0';
2247 }
2248
2249 return buf;
2250}
2251
2252static bool
2253_fileExistsInDirectory(const char *dir, const char *filename)
2254{
2255 struct stat st;
2256 char buf[MAXPGPATH];
2257
2258 if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2259 pg_fatal("directory name too long: \"%s\"", dir);
2260
2261 return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2262}
2263
2264static int
2266{
2267 FILE *fh;
2268 char sig[6]; /* More than enough */
2269 size_t cnt;
2270 int wantClose = 0;
2271
2272 pg_log_debug("attempting to ascertain archive format");
2273
2274 free(AH->lookahead);
2275
2276 AH->readHeader = 0;
2277 AH->lookaheadSize = 512;
2278 AH->lookahead = pg_malloc0(512);
2279 AH->lookaheadLen = 0;
2280 AH->lookaheadPos = 0;
2281
2282 if (AH->fSpec)
2283 {
2284 struct stat st;
2285
2286 wantClose = 1;
2287
2288 /*
2289 * Check if the specified archive is a directory. If so, check if
2290 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2291 */
2292 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2293 {
2294 AH->format = archDirectory;
2295 if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2296 return AH->format;
2297#ifdef HAVE_LIBZ
2298 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2299 return AH->format;
2300#endif
2301#ifdef USE_LZ4
2302 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2303 return AH->format;
2304#endif
2305#ifdef USE_ZSTD
2306 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2307 return AH->format;
2308#endif
2309 pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2310 AH->fSpec);
2311 fh = NULL; /* keep compiler quiet */
2312 }
2313 else
2314 {
2315 fh = fopen(AH->fSpec, PG_BINARY_R);
2316 if (!fh)
2317 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2318 }
2319 }
2320 else
2321 {
2322 fh = stdin;
2323 if (!fh)
2324 pg_fatal("could not open input file: %m");
2325 }
2326
2327 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2328 {
2329 if (ferror(fh))
2330 pg_fatal("could not read input file: %m");
2331 else
2332 pg_fatal("input file is too short (read %zu, expected 5)", cnt);
2333 }
2334
2335 /* Save it, just in case we need it later */
2336 memcpy(&AH->lookahead[0], sig, 5);
2337 AH->lookaheadLen = 5;
2338
2339 if (strncmp(sig, "PGDMP", 5) == 0)
2340 {
2341 /* It's custom format, stop here */
2342 AH->format = archCustom;
2343 AH->readHeader = 1;
2344 }
2345 else
2346 {
2347 /*
2348 * *Maybe* we have a tar archive format file or a text dump ... So,
2349 * read first 512 byte header...
2350 */
2351 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2352 /* read failure is checked below */
2353 AH->lookaheadLen += cnt;
2354
2358 {
2359 /*
2360 * looks like it's probably a text format dump. so suggest they
2361 * try psql
2362 */
2363 pg_fatal("input file appears to be a text format dump. Please use psql.");
2364 }
2365
2366 if (AH->lookaheadLen != 512)
2367 {
2368 if (feof(fh))
2369 pg_fatal("input file does not appear to be a valid archive (too short?)");
2370 else
2372 }
2373
2374 if (!isValidTarHeader(AH->lookahead))
2375 pg_fatal("input file does not appear to be a valid archive");
2376
2377 AH->format = archTar;
2378 }
2379
2380 /* Close the file if we opened it */
2381 if (wantClose)
2382 {
2383 if (fclose(fh) != 0)
2384 pg_fatal("could not close input file: %m");
2385 /* Forget lookahead, since we'll re-read header after re-opening */
2386 AH->readHeader = 0;
2387 AH->lookaheadLen = 0;
2388 }
2389
2390 return AH->format;
2391}
2392
2393
2394/*
2395 * Allocate an archive handle
2396 */
2397static ArchiveHandle *
2399 const pg_compress_specification compression_spec,
2400 bool dosync, ArchiveMode mode,
2402{
2403 ArchiveHandle *AH;
2406
2407 pg_log_debug("allocating AH for %s, format %d",
2408 FileSpec ? FileSpec : "(stdio)", fmt);
2409
2411
2412 AH->version = K_VERS_SELF;
2413
2414 /* initialize for backwards compatible string processing */
2415 AH->public.encoding = 0; /* PG_SQL_ASCII */
2416 AH->public.std_strings = true;
2417
2418 /* sql error handling */
2419 AH->public.exit_on_error = true;
2420 AH->public.n_errors = 0;
2421
2423
2424 AH->createDate = time(NULL);
2425
2426 AH->intSize = sizeof(int);
2427 AH->offSize = sizeof(pgoff_t);
2428 if (FileSpec)
2429 {
2430 AH->fSpec = pg_strdup(FileSpec);
2431
2432 /*
2433 * Not used; maybe later....
2434 *
2435 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2436 * i--) if (AH->workDir[i-1] == '/')
2437 */
2438 }
2439 else
2440 AH->fSpec = NULL;
2441
2442 AH->currUser = NULL; /* unknown */
2443 AH->currSchema = NULL; /* ditto */
2444 AH->currTablespace = NULL; /* ditto */
2445 AH->currTableAm = NULL; /* ditto */
2446
2448
2449 AH->toc->next = AH->toc;
2450 AH->toc->prev = AH->toc;
2451
2452 AH->mode = mode;
2453 AH->compression_spec = compression_spec;
2454 AH->dosync = dosync;
2456
2457 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2458
2459 /* Open stdout with no compression for AH output handle */
2462 if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2463 pg_fatal("could not open stdout for appending: %m");
2464 AH->OF = CFH;
2465
2466 /*
2467 * On Windows, we need to use binary mode to read/write non-text files,
2468 * which include all archive formats as well as compressed plain text.
2469 * Force stdin/stdout into binary mode if that is what we are using.
2470 */
2471#ifdef WIN32
2472 if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2473 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2474 {
2475 if (mode == archModeWrite)
2476 _setmode(fileno(stdout), O_BINARY);
2477 else
2478 _setmode(fileno(stdin), O_BINARY);
2479 }
2480#endif
2481
2483
2484 if (fmt == archUnknown)
2486 else
2487 AH->format = fmt;
2488
2489 switch (AH->format)
2490 {
2491 case archCustom:
2493 break;
2494
2495 case archNull:
2497 break;
2498
2499 case archDirectory:
2501 break;
2502
2503 case archTar:
2505 break;
2506
2507 default:
2508 pg_fatal("unrecognized file format \"%d\"", AH->format);
2509 }
2510
2511 return AH;
2512}
2513
2514/*
2515 * Write out all data (tables & LOs)
2516 */
2517void
2519{
2520 TocEntry *te;
2521
2522 if (pstate && pstate->numWorkers > 1)
2523 {
2524 /*
2525 * In parallel mode, this code runs in the leader process. We
2526 * construct an array of candidate TEs, then sort it into decreasing
2527 * size order, then dispatch each TE to a data-transfer worker. By
2528 * dumping larger tables first, we avoid getting into a situation
2529 * where we're down to one job and it's big, losing parallelism.
2530 */
2531 TocEntry **tes;
2532 int ntes;
2533
2535 ntes = 0;
2536 for (te = AH->toc->next; te != AH->toc; te = te->next)
2537 {
2538 /* Consider only TEs with dataDumper functions ... */
2539 if (!te->dataDumper)
2540 continue;
2541 /* ... and ignore ones not enabled for dump */
2542 if ((te->reqs & REQ_DATA) == 0)
2543 continue;
2544
2545 tes[ntes++] = te;
2546 }
2547
2548 if (ntes > 1)
2550
2551 for (int i = 0; i < ntes; i++)
2552 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2554
2555 pg_free(tes);
2556
2557 /* Now wait for workers to finish. */
2558 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2559 }
2560 else
2561 {
2562 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2563 for (te = AH->toc->next; te != AH->toc; te = te->next)
2564 {
2565 /* Must have same filter conditions as above */
2566 if (!te->dataDumper)
2567 continue;
2568 if ((te->reqs & REQ_DATA) == 0)
2569 continue;
2570
2572 }
2573 }
2574}
2575
2576
2577/*
2578 * Callback function that's invoked in the leader process after a step has
2579 * been parallel dumped.
2580 *
2581 * We don't need to do anything except check for worker failure.
2582 */
2583static void
2585 TocEntry *te,
2586 int status,
2587 void *callback_data)
2588{
2589 pg_log_info("finished item %d %s %s",
2590 te->dumpId, te->desc, te->tag);
2591
2592 if (status != 0)
2593 pg_fatal("worker process failed: exit code %d",
2594 status);
2595}
2596
2597
2598void
2600{
2603
2604 AH->currToc = te;
2605
2606 if (strcmp(te->desc, "BLOBS") == 0)
2607 {
2608 startPtr = AH->StartLOsPtr;
2609 endPtr = AH->EndLOsPtr;
2610 }
2611 else
2612 {
2613 startPtr = AH->StartDataPtr;
2614 endPtr = AH->EndDataPtr;
2615 }
2616
2617 if (startPtr != NULL)
2618 (*startPtr) (AH, te);
2619
2620 /*
2621 * The user-provided DataDumper routine needs to call AH->WriteData
2622 */
2623 te->dataDumper((Archive *) AH, te->dataDumperArg);
2624
2625 if (endPtr != NULL)
2626 (*endPtr) (AH, te);
2627
2628 AH->currToc = NULL;
2629}
2630
2631void
2633{
2634 TocEntry *te;
2635 char workbuf[32];
2636 int tocCount;
2637 int i;
2638
2639 /* count entries that will actually be dumped */
2640 tocCount = 0;
2641 for (te = AH->toc->next; te != AH->toc; te = te->next)
2642 {
2643 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2644 tocCount++;
2645 }
2646
2647 /* printf("%d TOC Entries to save\n", tocCount); */
2648
2649 WriteInt(AH, tocCount);
2650
2651 for (te = AH->toc->next; te != AH->toc; te = te->next)
2652 {
2653 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2654 continue;
2655
2656 WriteInt(AH, te->dumpId);
2657 WriteInt(AH, te->dataDumper ? 1 : 0);
2658
2659 /* OID is recorded as a string for historical reasons */
2660 sprintf(workbuf, "%u", te->catalogId.tableoid);
2661 WriteStr(AH, workbuf);
2662 sprintf(workbuf, "%u", te->catalogId.oid);
2663 WriteStr(AH, workbuf);
2664
2665 WriteStr(AH, te->tag);
2666 WriteStr(AH, te->desc);
2667 WriteInt(AH, te->section);
2668
2669 if (te->defnLen)
2670 {
2671 /*
2672 * defnLen should only be set for custom format's second call to
2673 * WriteToc(), which rewrites the TOC in place to update data
2674 * offsets. Instead of calling the defnDumper a second time
2675 * (which could involve re-executing queries), just skip writing
2676 * the entry. While regenerating the definition should
2677 * theoretically produce the same result as before, it's expensive
2678 * and feels risky.
2679 *
2680 * The custom format only calls WriteToc() a second time if
2681 * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2682 * so we can safely use it without checking. For other formats,
2683 * we fail because one of our assumptions must no longer hold
2684 * true.
2685 *
2686 * XXX This is a layering violation, but the alternative is an
2687 * awkward and complicated callback infrastructure for this
2688 * special case. This might be worth revisiting in the future.
2689 */
2690 if (AH->format != archCustom)
2691 pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2692 te->dumpId, te->desc, te->tag);
2693
2694 if (fseeko(AH->FH, te->defnLen, SEEK_CUR) != 0)
2695 pg_fatal("error during file seek: %m");
2696 }
2697 else if (te->defnDumper)
2698 {
2699 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2700
2701 te->defnLen = WriteStr(AH, defn);
2702 pg_free(defn);
2703 }
2704 else
2705 WriteStr(AH, te->defn);
2706
2707 WriteStr(AH, te->dropStmt);
2708 WriteStr(AH, te->copyStmt);
2709 WriteStr(AH, te->namespace);
2710 WriteStr(AH, te->tablespace);
2711 WriteStr(AH, te->tableam);
2712 WriteInt(AH, te->relkind);
2713 WriteStr(AH, te->owner);
2714 WriteStr(AH, "false");
2715
2716 /* Dump list of dependencies */
2717 for (i = 0; i < te->nDeps; i++)
2718 {
2719 sprintf(workbuf, "%d", te->dependencies[i]);
2720 WriteStr(AH, workbuf);
2721 }
2722 WriteStr(AH, NULL); /* Terminate List */
2723
2724 if (AH->WriteExtraTocPtr)
2725 AH->WriteExtraTocPtr(AH, te);
2726 }
2727}
2728
2729void
2731{
2732 int i;
2733 char *tmp;
2734 DumpId *deps;
2735 int depIdx;
2736 int depSize;
2737 TocEntry *te;
2738 bool is_supported;
2739
2740 AH->tocCount = ReadInt(AH);
2741 AH->maxDumpId = 0;
2742
2743 for (i = 0; i < AH->tocCount; i++)
2744 {
2746 te->dumpId = ReadInt(AH);
2747
2748 if (te->dumpId > AH->maxDumpId)
2749 AH->maxDumpId = te->dumpId;
2750
2751 /* Sanity check */
2752 if (te->dumpId <= 0)
2753 pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2754 te->dumpId);
2755
2756 te->hadDumper = ReadInt(AH);
2757
2758 if (AH->version >= K_VERS_1_8)
2759 {
2760 tmp = ReadStr(AH);
2761 sscanf(tmp, "%u", &te->catalogId.tableoid);
2762 free(tmp);
2763 }
2764 else
2766 tmp = ReadStr(AH);
2767 sscanf(tmp, "%u", &te->catalogId.oid);
2768 free(tmp);
2769
2770 te->tag = ReadStr(AH);
2771 te->desc = ReadStr(AH);
2772
2773 if (AH->version >= K_VERS_1_11)
2774 {
2775 te->section = ReadInt(AH);
2776 }
2777 else
2778 {
2779 /*
2780 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2781 * the entries into sections. This list need not cover entry
2782 * types added later than 8.4.
2783 */
2784 if (strcmp(te->desc, "COMMENT") == 0 ||
2785 strcmp(te->desc, "ACL") == 0 ||
2786 strcmp(te->desc, "ACL LANGUAGE") == 0)
2787 te->section = SECTION_NONE;
2788 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2789 strcmp(te->desc, "BLOBS") == 0 ||
2790 strcmp(te->desc, "BLOB COMMENTS") == 0)
2791 te->section = SECTION_DATA;
2792 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2793 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2794 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2795 strcmp(te->desc, "INDEX") == 0 ||
2796 strcmp(te->desc, "RULE") == 0 ||
2797 strcmp(te->desc, "TRIGGER") == 0)
2799 else
2801 }
2802
2803 te->defn = ReadStr(AH);
2804 te->dropStmt = ReadStr(AH);
2805
2806 if (AH->version >= K_VERS_1_3)
2807 te->copyStmt = ReadStr(AH);
2808
2809 if (AH->version >= K_VERS_1_6)
2810 te->namespace = ReadStr(AH);
2811
2812 if (AH->version >= K_VERS_1_10)
2813 te->tablespace = ReadStr(AH);
2814
2815 if (AH->version >= K_VERS_1_14)
2816 te->tableam = ReadStr(AH);
2817
2818 if (AH->version >= K_VERS_1_16)
2819 te->relkind = ReadInt(AH);
2820
2821 te->owner = ReadStr(AH);
2822 is_supported = true;
2823 if (AH->version < K_VERS_1_9)
2824 is_supported = false;
2825 else
2826 {
2827 tmp = ReadStr(AH);
2828
2829 if (strcmp(tmp, "true") == 0)
2830 is_supported = false;
2831
2832 free(tmp);
2833 }
2834
2835 if (!is_supported)
2836 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2837
2838 /* Read TOC entry dependencies */
2839 if (AH->version >= K_VERS_1_5)
2840 {
2841 depSize = 100;
2843 depIdx = 0;
2844 for (;;)
2845 {
2846 tmp = ReadStr(AH);
2847 if (!tmp)
2848 break; /* end of list */
2849 if (depIdx >= depSize)
2850 {
2851 depSize *= 2;
2852 deps = pg_realloc_array(deps, DumpId, depSize);
2853 }
2854 sscanf(tmp, "%d", &deps[depIdx]);
2855 free(tmp);
2856 depIdx++;
2857 }
2858
2859 if (depIdx > 0) /* We have a non-null entry */
2860 {
2861 deps = pg_realloc_array(deps, DumpId, depIdx);
2862 te->dependencies = deps;
2863 te->nDeps = depIdx;
2864 }
2865 else
2866 {
2867 free(deps);
2868 te->dependencies = NULL;
2869 te->nDeps = 0;
2870 }
2871 }
2872 else
2873 {
2874 te->dependencies = NULL;
2875 te->nDeps = 0;
2876 }
2877 te->dataLength = 0;
2878
2879 if (AH->ReadExtraTocPtr)
2880 AH->ReadExtraTocPtr(AH, te);
2881
2882 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2883 i, te->dumpId, te->desc, te->tag);
2884
2885 /* link completed entry into TOC circular list */
2886 te->prev = AH->toc->prev;
2887 AH->toc->prev->next = te;
2888 AH->toc->prev = te;
2889 te->next = AH->toc;
2890
2891 /* special processing immediately upon read for some items */
2892 if (strcmp(te->desc, "ENCODING") == 0)
2893 processEncodingEntry(AH, te);
2894 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2895 processStdStringsEntry(AH, te);
2896 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2897 processSearchPathEntry(AH, te);
2898 }
2899}
2900
2901static void
2903{
2904 /* te->defn should have the form SET client_encoding = 'foo'; */
2905 char *defn = pg_strdup(te->defn);
2906 char *ptr1;
2907 char *ptr2 = NULL;
2908 int encoding;
2909
2910 ptr1 = strchr(defn, '\'');
2911 if (ptr1)
2912 ptr2 = strchr(++ptr1, '\'');
2913 if (ptr2)
2914 {
2915 *ptr2 = '\0';
2917 if (encoding < 0)
2918 pg_fatal("unrecognized encoding \"%s\"",
2919 ptr1);
2920 AH->public.encoding = encoding;
2922 }
2923 else
2924 pg_fatal("invalid ENCODING item: %s",
2925 te->defn);
2926
2927 free(defn);
2928}
2929
2930static void
2932{
2933 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2934 char *ptr1;
2935
2936 ptr1 = strchr(te->defn, '\'');
2937 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2938 AH->public.std_strings = true;
2939 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2940 AH->public.std_strings = false;
2941 else
2942 pg_fatal("invalid STDSTRINGS item: %s",
2943 te->defn);
2944}
2945
2946static void
2948{
2949 /*
2950 * te->defn should contain a command to set search_path. We just copy it
2951 * verbatim for use later.
2952 */
2953 AH->public.searchpath = pg_strdup(te->defn);
2954}
2955
2956static void
2958{
2959 const char *missing_name;
2960
2961 Assert(ropt->strict_names);
2962
2963 if (ropt->schemaNames.head != NULL)
2964 {
2966 if (missing_name != NULL)
2967 pg_fatal("schema \"%s\" not found", missing_name);
2968 }
2969
2970 if (ropt->tableNames.head != NULL)
2971 {
2973 if (missing_name != NULL)
2974 pg_fatal("table \"%s\" not found", missing_name);
2975 }
2976
2977 if (ropt->indexNames.head != NULL)
2978 {
2980 if (missing_name != NULL)
2981 pg_fatal("index \"%s\" not found", missing_name);
2982 }
2983
2984 if (ropt->functionNames.head != NULL)
2985 {
2987 if (missing_name != NULL)
2988 pg_fatal("function \"%s\" not found", missing_name);
2989 }
2990
2991 if (ropt->triggerNames.head != NULL)
2992 {
2994 if (missing_name != NULL)
2995 pg_fatal("trigger \"%s\" not found", missing_name);
2996 }
2997}
2998
2999/*
3000 * Determine whether we want to restore this TOC entry.
3001 *
3002 * Returns 0 if entry should be skipped, or some combination of the
3003 * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
3004 * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
3005 * special entry.
3006 */
3007static int
3009{
3010 int res = REQ_SCHEMA | REQ_DATA;
3011 RestoreOptions *ropt = AH->public.ropt;
3012
3013 /*
3014 * For binary upgrade mode, dump pg_largeobject_metadata and the
3015 * associated pg_shdepend rows. This is faster to restore than the
3016 * equivalent set of large object commands.
3017 */
3018 if (ropt->binary_upgrade && strcmp(te->desc, "TABLE DATA") == 0 &&
3021 return REQ_DATA;
3022
3023 /* These items are treated specially */
3024 if (strcmp(te->desc, "ENCODING") == 0 ||
3025 strcmp(te->desc, "STDSTRINGS") == 0 ||
3026 strcmp(te->desc, "SEARCHPATH") == 0)
3027 return REQ_SPECIAL;
3028
3029 if ((strcmp(te->desc, "STATISTICS DATA") == 0) ||
3030 (strcmp(te->desc, "EXTENDED STATISTICS DATA") == 0))
3031 {
3032 if (!ropt->dumpStatistics)
3033 return 0;
3034
3035 res = REQ_STATS;
3036 }
3037
3038 /*
3039 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
3040 * restored in createDB mode, and not restored otherwise, independently of
3041 * all else.
3042 */
3043 if (strcmp(te->desc, "DATABASE") == 0 ||
3044 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
3045 {
3046 if (ropt->createDB)
3047 return REQ_SCHEMA;
3048 else
3049 return 0;
3050 }
3051
3052 /*
3053 * Global object TOC entries (e.g., ROLEs or TABLESPACEs) must not be
3054 * ignored.
3055 */
3056 if (strcmp(te->desc, "ROLE") == 0 ||
3057 strcmp(te->desc, "ROLE PROPERTIES") == 0 ||
3058 strcmp(te->desc, "TABLESPACE") == 0 ||
3059 strcmp(te->desc, "DROP_GLOBAL") == 0)
3060 return REQ_SCHEMA;
3061
3062 /*
3063 * Process exclusions that affect certain classes of TOC entries.
3064 */
3065
3066 /* If it's an ACL, maybe ignore it */
3067 if (ropt->aclsSkip && _tocEntryIsACL(te))
3068 return 0;
3069
3070 /* If it's a comment, maybe ignore it */
3071 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3072 return 0;
3073
3074 /* If it's a policy, maybe ignore it */
3075 if (ropt->no_policies &&
3076 (strcmp(te->desc, "POLICY") == 0 ||
3077 strcmp(te->desc, "ROW SECURITY") == 0))
3078 return 0;
3079
3080 /*
3081 * If it's a comment on a policy, a publication, or a subscription, maybe
3082 * ignore it.
3083 */
3084 if (strcmp(te->desc, "COMMENT") == 0)
3085 {
3086 if (ropt->no_policies &&
3087 strncmp(te->tag, "POLICY", strlen("POLICY")) == 0)
3088 return 0;
3089
3090 if (ropt->no_publications &&
3091 strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3092 return 0;
3093
3094 if (ropt->no_subscriptions &&
3095 strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3096 return 0;
3097
3098 /*
3099 * Comments on global objects (ROLEs or TABLESPACEs) should not be
3100 * skipped, since global objects themselves are never skipped.
3101 */
3102 if (strncmp(te->tag, "ROLE", strlen("ROLE")) == 0 ||
3103 strncmp(te->tag, "TABLESPACE", strlen("TABLESPACE")) == 0)
3104 return REQ_SCHEMA;
3105 }
3106
3107 /*
3108 * If it's a publication or a table part of a publication, maybe ignore
3109 * it.
3110 */
3111 if (ropt->no_publications &&
3112 (strcmp(te->desc, "PUBLICATION") == 0 ||
3113 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3114 strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3115 return 0;
3116
3117 /* If it's a security label, maybe ignore it */
3118 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3119 return 0;
3120
3121 /*
3122 * If it's a security label on a publication or a subscription, maybe
3123 * ignore it.
3124 */
3125 if (strcmp(te->desc, "SECURITY LABEL") == 0)
3126 {
3127 if (ropt->no_publications &&
3128 strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3129 return 0;
3130
3131 if (ropt->no_subscriptions &&
3132 strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3133 return 0;
3134
3135 /*
3136 * Security labels on global objects (ROLEs or TABLESPACEs) should not
3137 * be skipped, since global objects themselves are never skipped.
3138 */
3139 if (strncmp(te->tag, "ROLE", strlen("ROLE")) == 0 ||
3140 strncmp(te->tag, "TABLESPACE", strlen("TABLESPACE")) == 0)
3141 return REQ_SCHEMA;
3142 }
3143
3144 /* If it's a subscription, maybe ignore it */
3145 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3146 return 0;
3147
3148 /* Ignore it if section is not to be dumped/restored */
3149 switch (curSection)
3150 {
3151 case SECTION_PRE_DATA:
3152 if (!(ropt->dumpSections & DUMP_PRE_DATA))
3153 return 0;
3154 break;
3155 case SECTION_DATA:
3156 if (!(ropt->dumpSections & DUMP_DATA))
3157 return 0;
3158 break;
3159 case SECTION_POST_DATA:
3160 if (!(ropt->dumpSections & DUMP_POST_DATA))
3161 return 0;
3162 break;
3163 default:
3164 /* shouldn't get here, really, but ignore it */
3165 return 0;
3166 }
3167
3168 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3169 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3170 return 0;
3171
3172 /*
3173 * Check options for selective dump/restore.
3174 */
3175 if (strcmp(te->desc, "ACL") == 0 ||
3176 strcmp(te->desc, "COMMENT") == 0 ||
3177 strcmp(te->desc, "STATISTICS DATA") == 0 ||
3178 strcmp(te->desc, "SECURITY LABEL") == 0)
3179 {
3180 /* Database properties react to createDB, not selectivity options. */
3181 if (strncmp(te->tag, "DATABASE ", 9) == 0)
3182 {
3183 if (!ropt->createDB)
3184 return 0;
3185 }
3186 else if (ropt->schemaNames.head != NULL ||
3187 ropt->schemaExcludeNames.head != NULL ||
3188 ropt->selTypes)
3189 {
3190 /*
3191 * In a selective dump/restore, we want to restore these dependent
3192 * TOC entry types only if their parent object is being restored.
3193 * Without selectivity options, we let through everything in the
3194 * archive. Note there may be such entries with no parent, eg
3195 * non-default ACLs for built-in objects. Also, we make
3196 * per-column ACLs additionally depend on the table's ACL if any
3197 * to ensure correct restore order, so those dependencies should
3198 * be ignored in this check.
3199 *
3200 * This code depends on the parent having been marked already,
3201 * which should be the case; if it isn't, perhaps due to
3202 * SortTocFromFile rearrangement, skipping the dependent entry
3203 * seems prudent anyway.
3204 *
3205 * Ideally we'd handle, eg, table CHECK constraints this way too.
3206 * But it's hard to tell which of their dependencies is the one to
3207 * consult.
3208 */
3209 bool dumpthis = false;
3210
3211 for (int i = 0; i < te->nDeps; i++)
3212 {
3214
3215 if (!pte)
3216 continue; /* probably shouldn't happen */
3217 if (strcmp(pte->desc, "ACL") == 0)
3218 continue; /* ignore dependency on another ACL */
3219 if (pte->reqs == 0)
3220 continue; /* this object isn't marked, so ignore it */
3221 /* Found a parent to be dumped, so we want to dump this too */
3222 dumpthis = true;
3223 break;
3224 }
3225 if (!dumpthis)
3226 return 0;
3227 }
3228 }
3229 else
3230 {
3231 /* Apply selective-restore rules for standalone TOC entries. */
3232 if (ropt->schemaNames.head != NULL)
3233 {
3234 /* If no namespace is specified, it means all. */
3235 if (!te->namespace)
3236 return 0;
3237 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3238 return 0;
3239 }
3240
3241 if (ropt->schemaExcludeNames.head != NULL &&
3242 te->namespace &&
3243 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3244 return 0;
3245
3246 if (ropt->selTypes)
3247 {
3248 if (strcmp(te->desc, "TABLE") == 0 ||
3249 strcmp(te->desc, "TABLE DATA") == 0 ||
3250 strcmp(te->desc, "VIEW") == 0 ||
3251 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3252 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3253 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3254 strcmp(te->desc, "SEQUENCE") == 0 ||
3255 strcmp(te->desc, "SEQUENCE SET") == 0)
3256 {
3257 if (!ropt->selTable)
3258 return 0;
3259 if (ropt->tableNames.head != NULL &&
3261 return 0;
3262 }
3263 else if (strcmp(te->desc, "INDEX") == 0)
3264 {
3265 if (!ropt->selIndex)
3266 return 0;
3267 if (ropt->indexNames.head != NULL &&
3269 return 0;
3270 }
3271 else if (strcmp(te->desc, "FUNCTION") == 0 ||
3272 strcmp(te->desc, "AGGREGATE") == 0 ||
3273 strcmp(te->desc, "PROCEDURE") == 0)
3274 {
3275 if (!ropt->selFunction)
3276 return 0;
3277 if (ropt->functionNames.head != NULL &&
3279 return 0;
3280 }
3281 else if (strcmp(te->desc, "TRIGGER") == 0)
3282 {
3283 if (!ropt->selTrigger)
3284 return 0;
3285 if (ropt->triggerNames.head != NULL &&
3287 return 0;
3288 }
3289 else
3290 return 0;
3291 }
3292 }
3293
3294
3295 /*
3296 * Determine whether the TOC entry contains schema and/or data components,
3297 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3298 * it's both schema and data. Otherwise it's probably schema-only, but
3299 * there are exceptions.
3300 */
3301 if (!te->hadDumper)
3302 {
3303 /*
3304 * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3305 * is considered a data entry. We don't need to check for BLOBS or
3306 * old-style BLOB COMMENTS entries, because they will have hadDumper =
3307 * true ... but we do need to check new-style BLOB ACLs, comments,
3308 * etc.
3309 */
3310 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3311 strcmp(te->desc, "BLOB") == 0 ||
3312 strcmp(te->desc, "BLOB METADATA") == 0 ||
3313 (strcmp(te->desc, "ACL") == 0 &&
3314 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3315 (strcmp(te->desc, "COMMENT") == 0 &&
3316 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3317 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3318 strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3319 res = res & REQ_DATA;
3320 else
3321 res = res & ~REQ_DATA;
3322 }
3323
3324 /*
3325 * If there's no definition command, there's no schema component. Treat
3326 * "load via partition root" comments as not schema.
3327 */
3328 if (!te->defn || !te->defn[0] ||
3329 strncmp(te->defn, "-- load via partition root ", 27) == 0)
3330 res = res & ~REQ_SCHEMA;
3331
3332 /*
3333 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3334 * always ignore it.
3335 */
3336 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3337 return 0;
3338
3339 /* Mask it if we don't want data */
3340 if (!ropt->dumpData)
3341 {
3342 /*
3343 * The sequence_data option overrides dumpData for SEQUENCE SET.
3344 *
3345 * In binary-upgrade mode, even with dumpData unset, we do not mask
3346 * out large objects. (Only large object definitions, comments and
3347 * other metadata should be generated in binary-upgrade mode, not the
3348 * actual data, but that need not concern us here.)
3349 */
3350 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3351 !(ropt->binary_upgrade &&
3352 (strcmp(te->desc, "BLOB") == 0 ||
3353 strcmp(te->desc, "BLOB METADATA") == 0 ||
3354 (strcmp(te->desc, "ACL") == 0 &&
3355 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3356 (strcmp(te->desc, "COMMENT") == 0 &&
3357 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3358 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3359 strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3360 res = res & (REQ_SCHEMA | REQ_STATS);
3361 }
3362
3363 /* Mask it if we don't want schema */
3364 if (!ropt->dumpSchema)
3365 res = res & (REQ_DATA | REQ_STATS);
3366
3367 return res;
3368}
3369
3370/*
3371 * Identify which pass we should restore this TOC entry in.
3372 *
3373 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3374 */
3375static RestorePass
3377{
3378 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3379 if (strcmp(te->desc, "ACL") == 0 ||
3380 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3381 strcmp(te->desc, "DEFAULT ACL") == 0)
3382 return RESTORE_PASS_ACL;
3383 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3384 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3385 return RESTORE_PASS_POST_ACL;
3386
3387 /*
3388 * Comments and security labels need to be emitted in the same pass as
3389 * their parent objects. ACLs haven't got comments and security labels,
3390 * and neither do matview data objects, but event triggers do.
3391 * (Fortunately, event triggers haven't got ACLs, or we'd need yet another
3392 * weird special case.)
3393 */
3394 if ((strcmp(te->desc, "COMMENT") == 0 ||
3395 strcmp(te->desc, "SECURITY LABEL") == 0) &&
3396 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3397 return RESTORE_PASS_POST_ACL;
3398
3399 /*
3400 * If statistics data is dependent on materialized view data, it must be
3401 * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3402 * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3403 * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3404 * code in fetchAttributeStats() assumes that we dump all statistics data
3405 * entries in TOC order. To ensure this assumption holds, we move all
3406 * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3407 */
3408 if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3410 return RESTORE_PASS_POST_ACL;
3411
3412 /* All else can be handled in the main pass. */
3413 return RESTORE_PASS_MAIN;
3414}
3415
3416/*
3417 * Identify TOC entries that are ACLs.
3418 *
3419 * Note: it seems worth duplicating some code here to avoid a hard-wired
3420 * assumption that these are exactly the same entries that we restore during
3421 * the RESTORE_PASS_ACL phase.
3422 */
3423static bool
3425{
3426 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3427 if (strcmp(te->desc, "ACL") == 0 ||
3428 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3429 strcmp(te->desc, "DEFAULT ACL") == 0)
3430 return true;
3431 return false;
3432}
3433
3434/*
3435 * Issue SET commands for parameters that we want to have set the same way
3436 * at all times during execution of a restore script.
3437 */
3438static void
3440{
3441 RestoreOptions *ropt = AH->public.ropt;
3442
3443 /*
3444 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3445 */
3446 ahprintf(AH, "SET statement_timeout = 0;\n");
3447 ahprintf(AH, "SET lock_timeout = 0;\n");
3448 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3449 ahprintf(AH, "SET transaction_timeout = 0;\n");
3450
3451 /* Select the correct character set encoding */
3452 ahprintf(AH, "SET client_encoding = '%s';\n",
3454
3455 /* Select the correct string literal syntax */
3456 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3457 AH->public.std_strings ? "on" : "off");
3458
3459 /* Select the role to be used during restore */
3460 if (ropt && ropt->use_role)
3461 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3462
3463 /* Select the dump-time search_path */
3464 if (AH->public.searchpath)
3465 ahprintf(AH, "%s", AH->public.searchpath);
3466
3467 /* Make sure function checking is disabled */
3468 ahprintf(AH, "SET check_function_bodies = false;\n");
3469
3470 /* Ensure that all valid XML data will be accepted */
3471 ahprintf(AH, "SET xmloption = content;\n");
3472
3473 /* Avoid annoying notices etc */
3474 ahprintf(AH, "SET client_min_messages = warning;\n");
3475
3476 /* Adjust row-security state */
3477 if (ropt && ropt->enable_row_security)
3478 ahprintf(AH, "SET row_security = on;\n");
3479 else
3480 ahprintf(AH, "SET row_security = off;\n");
3481
3482 /*
3483 * In --transaction-size mode, we should always be in a transaction when
3484 * we begin to restore objects.
3485 */
3486 if (ropt && ropt->txn_size > 0)
3487 {
3488 if (AH->connection)
3490 else
3491 ahprintf(AH, "\nBEGIN;\n");
3492 AH->txnCount = 0;
3493 }
3494
3495 ahprintf(AH, "\n");
3496}
3497
3498/*
3499 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3500 * for updating state if appropriate. If user is NULL or an empty string,
3501 * the specification DEFAULT will be used.
3502 */
3503static void
3505{
3507
3508 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3509
3510 /*
3511 * SQL requires a string literal here. Might as well be correct.
3512 */
3513 if (user && *user)
3514 appendStringLiteralAHX(cmd, user, AH);
3515 else
3516 appendPQExpBufferStr(cmd, "DEFAULT");
3517 appendPQExpBufferChar(cmd, ';');
3518
3519 if (RestoringToDB(AH))
3520 {
3521 PGresult *res;
3522
3523 res = PQexec(AH->connection, cmd->data);
3524
3525 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3526 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3527 pg_fatal("could not set session user to \"%s\": %s",
3529
3530 PQclear(res);
3531 }
3532 else
3533 ahprintf(AH, "%s\n\n", cmd->data);
3534
3535 destroyPQExpBuffer(cmd);
3536}
3537
3538
3539/*
3540 * Issue the commands to connect to the specified database.
3541 *
3542 * If we're currently restoring right into a database, this will
3543 * actually establish a connection. Otherwise it puts a \connect into
3544 * the script output.
3545 */
3546static void
3548{
3549 if (RestoringToDB(AH))
3551 else
3552 {
3554 RestoreOptions *ropt = AH->public.ropt;
3555
3556 /*
3557 * We must temporarily exit restricted mode for \connect, etc.
3558 * Anything added between this line and the following \restrict must
3559 * be careful to avoid any possible meta-command injection vectors.
3560 */
3561 ahprintf(AH, "\\unrestrict %s\n", ropt->restrict_key);
3562
3565 ahprintf(AH, "%s", connectbuf.data);
3567
3568 ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
3569 }
3570
3571 /*
3572 * NOTE: currUser keeps track of what the imaginary session user in our
3573 * script is. It's now effectively reset to the original userID.
3574 */
3575 free(AH->currUser);
3576 AH->currUser = NULL;
3577
3578 /* don't assume we still know the output schema, tablespace, etc either */
3579 free(AH->currSchema);
3580 AH->currSchema = NULL;
3581
3582 free(AH->currTableAm);
3583 AH->currTableAm = NULL;
3584
3585 free(AH->currTablespace);
3586 AH->currTablespace = NULL;
3587
3588 /* re-establish fixed state */
3590}
3591
3592/*
3593 * Become the specified user, and update state to avoid redundant commands
3594 *
3595 * NULL or empty argument is taken to mean restoring the session default
3596 */
3597static void
3599{
3600 if (!user)
3601 user = ""; /* avoid null pointers */
3602
3603 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3604 return; /* no need to do anything */
3605
3607
3608 /*
3609 * NOTE: currUser keeps track of what the imaginary session user in our
3610 * script is
3611 */
3612 free(AH->currUser);
3613 AH->currUser = pg_strdup(user);
3614}
3615
3616/*
3617 * Become the owner of the given TOC entry object. If
3618 * changes in ownership are not allowed, this doesn't do anything.
3619 */
3620static void
3622{
3623 RestoreOptions *ropt = AH->public.ropt;
3624
3625 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3626 return;
3627
3628 _becomeUser(AH, te->owner);
3629}
3630
3631
3632/*
3633 * Issue the commands to select the specified schema as the current schema
3634 * in the target database.
3635 */
3636static void
3638{
3639 PQExpBuffer qry;
3640
3641 /*
3642 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3643 * that search_path rather than switching to entry-specific paths.
3644 * Otherwise, it's an old archive that will not restore correctly unless
3645 * we set the search_path as it's expecting.
3646 */
3647 if (AH->public.searchpath)
3648 return;
3649
3650 if (!schemaName || *schemaName == '\0' ||
3651 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3652 return; /* no need to do anything */
3653
3654 qry = createPQExpBuffer();
3655
3656 appendPQExpBuffer(qry, "SET search_path = %s",
3657 fmtId(schemaName));
3658 if (strcmp(schemaName, "pg_catalog") != 0)
3659 appendPQExpBufferStr(qry, ", pg_catalog");
3660
3661 if (RestoringToDB(AH))
3662 {
3663 PGresult *res;
3664
3665 res = PQexec(AH->connection, qry->data);
3666
3667 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3669 "could not set \"search_path\" to \"%s\": %s",
3671
3672 PQclear(res);
3673 }
3674 else
3675 ahprintf(AH, "%s;\n\n", qry->data);
3676
3677 free(AH->currSchema);
3679
3680 destroyPQExpBuffer(qry);
3681}
3682
3683/*
3684 * Issue the commands to select the specified tablespace as the current one
3685 * in the target database.
3686 */
3687static void
3689{
3690 RestoreOptions *ropt = AH->public.ropt;
3691 PQExpBuffer qry;
3692 const char *want,
3693 *have;
3694
3695 /* do nothing in --no-tablespaces mode */
3696 if (ropt->noTablespace)
3697 return;
3698
3699 have = AH->currTablespace;
3700 want = tablespace;
3701
3702 /* no need to do anything for non-tablespace object */
3703 if (!want)
3704 return;
3705
3706 if (have && strcmp(want, have) == 0)
3707 return; /* no need to do anything */
3708
3709 qry = createPQExpBuffer();
3710
3711 if (strcmp(want, "") == 0)
3712 {
3713 /* We want the tablespace to be the database's default */
3714 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3715 }
3716 else
3717 {
3718 /* We want an explicit tablespace */
3719 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3720 }
3721
3722 if (RestoringToDB(AH))
3723 {
3724 PGresult *res;
3725
3726 res = PQexec(AH->connection, qry->data);
3727
3728 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3730 "could not set \"default_tablespace\" to %s: %s",
3732
3733 PQclear(res);
3734 }
3735 else
3736 ahprintf(AH, "%s;\n\n", qry->data);
3737
3738 free(AH->currTablespace);
3740
3741 destroyPQExpBuffer(qry);
3742}
3743
3744/*
3745 * Set the proper default_table_access_method value for the table.
3746 */
3747static void
3749{
3750 RestoreOptions *ropt = AH->public.ropt;
3751 PQExpBuffer cmd;
3752 const char *want,
3753 *have;
3754
3755 /* do nothing in --no-table-access-method mode */
3756 if (ropt->noTableAm)
3757 return;
3758
3759 have = AH->currTableAm;
3760 want = tableam;
3761
3762 if (!want)
3763 return;
3764
3765 if (have && strcmp(want, have) == 0)
3766 return;
3767
3768 cmd = createPQExpBuffer();
3769 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3770
3771 if (RestoringToDB(AH))
3772 {
3773 PGresult *res;
3774
3775 res = PQexec(AH->connection, cmd->data);
3776
3777 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3779 "could not set \"default_table_access_method\": %s",
3781
3782 PQclear(res);
3783 }
3784 else
3785 ahprintf(AH, "%s\n\n", cmd->data);
3786
3787 destroyPQExpBuffer(cmd);
3788
3789 free(AH->currTableAm);
3790 AH->currTableAm = pg_strdup(want);
3791}
3792
3793/*
3794 * Set the proper default table access method for a table without storage.
3795 * Currently, this is required only for partitioned tables with a table AM.
3796 */
3797static void
3799{
3800 RestoreOptions *ropt = AH->public.ropt;
3801 const char *tableam = te->tableam;
3802 PQExpBuffer cmd;
3803
3804 /* do nothing in --no-table-access-method mode */
3805 if (ropt->noTableAm)
3806 return;
3807
3808 if (!tableam)
3809 return;
3810
3812
3813 cmd = createPQExpBuffer();
3814
3815 appendPQExpBufferStr(cmd, "ALTER TABLE ");
3816 appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3817 appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3818 fmtId(tableam));
3819
3820 if (RestoringToDB(AH))
3821 {
3822 PGresult *res;
3823
3824 res = PQexec(AH->connection, cmd->data);
3825
3826 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3828 "could not alter table access method: %s",
3830 PQclear(res);
3831 }
3832 else
3833 ahprintf(AH, "%s\n\n", cmd->data);
3834
3835 destroyPQExpBuffer(cmd);
3836}
3837
3838/*
3839 * Extract an object description for a TOC entry, and append it to buf.
3840 *
3841 * This is used for ALTER ... OWNER TO.
3842 *
3843 * If the object type has no owner, do nothing.
3844 */
3845static void
3847{
3848 const char *type = te->desc;
3849
3850 /* objects that don't require special decoration */
3851 if (strcmp(type, "COLLATION") == 0 ||
3852 strcmp(type, "CONVERSION") == 0 ||
3853 strcmp(type, "DOMAIN") == 0 ||
3854 strcmp(type, "FOREIGN TABLE") == 0 ||
3855 strcmp(type, "MATERIALIZED VIEW") == 0 ||
3856 strcmp(type, "SEQUENCE") == 0 ||
3857 strcmp(type, "STATISTICS") == 0 ||
3858 strcmp(type, "TABLE") == 0 ||
3859 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3860 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3861 strcmp(type, "TYPE") == 0 ||
3862 strcmp(type, "VIEW") == 0 ||
3863 /* non-schema-specified objects */
3864 strcmp(type, "DATABASE") == 0 ||
3865 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3866 strcmp(type, "SCHEMA") == 0 ||
3867 strcmp(type, "EVENT TRIGGER") == 0 ||
3868 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3869 strcmp(type, "SERVER") == 0 ||
3870 strcmp(type, "PUBLICATION") == 0 ||
3871 strcmp(type, "SUBSCRIPTION") == 0)
3872 {
3873 appendPQExpBuffer(buf, "%s ", type);
3874 if (te->namespace && *te->namespace)
3875 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3877 }
3878 /* LOs just have a name, but it's numeric so must not use fmtId */
3879 else if (strcmp(type, "BLOB") == 0)
3880 {
3881 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3882 }
3883
3884 /*
3885 * These object types require additional decoration. Fortunately, the
3886 * information needed is exactly what's in the DROP command.
3887 */
3888 else if (strcmp(type, "AGGREGATE") == 0 ||
3889 strcmp(type, "FUNCTION") == 0 ||
3890 strcmp(type, "OPERATOR") == 0 ||
3891 strcmp(type, "OPERATOR CLASS") == 0 ||
3892 strcmp(type, "OPERATOR FAMILY") == 0 ||
3893 strcmp(type, "PROCEDURE") == 0)
3894 {
3895 /* Chop "DROP " off the front and make a modifiable copy */
3896 char *first = pg_strdup(te->dropStmt + 5);
3897 char *last;
3898
3899 /* point to last character in string */
3900 last = first + strlen(first) - 1;
3901
3902 /* Strip off any ';' or '\n' at the end */
3903 while (last >= first && (*last == '\n' || *last == ';'))
3904 last--;
3905 *(last + 1) = '\0';
3906
3907 appendPQExpBufferStr(buf, first);
3908
3909 free(first);
3910 return;
3911 }
3912 /* these object types don't have separate owners */
3913 else if (strcmp(type, "CAST") == 0 ||
3914 strcmp(type, "CHECK CONSTRAINT") == 0 ||
3915 strcmp(type, "CONSTRAINT") == 0 ||
3916 strcmp(type, "DROP_GLOBAL") == 0 ||
3917 strcmp(type, "ROLE PROPERTIES") == 0 ||
3918 strcmp(type, "ROLE") == 0 ||
3919 strcmp(type, "DATABASE PROPERTIES") == 0 ||
3920 strcmp(type, "DEFAULT") == 0 ||
3921 strcmp(type, "FK CONSTRAINT") == 0 ||
3922 strcmp(type, "INDEX") == 0 ||
3923 strcmp(type, "RULE") == 0 ||
3924 strcmp(type, "TRIGGER") == 0 ||
3925 strcmp(type, "ROW SECURITY") == 0 ||
3926 strcmp(type, "POLICY") == 0 ||
3927 strcmp(type, "USER MAPPING") == 0)
3928 {
3929 /* do nothing */
3930 }
3931 else
3932 pg_fatal("don't know how to set owner for object type \"%s\"", type);
3933}
3934
3935/*
3936 * Emit the SQL commands to create the object represented by a TOC entry
3937 *
3938 * This now also includes issuing an ALTER OWNER command to restore the
3939 * object's ownership, if wanted. But note that the object's permissions
3940 * will remain at default, until the matching ACL TOC entry is restored.
3941 */
3942static void
3944{
3945 RestoreOptions *ropt = AH->public.ropt;
3946
3947 /*
3948 * Select owner, schema, tablespace and default AM as necessary. The
3949 * default access method for partitioned tables is handled after
3950 * generating the object definition, as it requires an ALTER command
3951 * rather than SET.
3952 */
3953 _becomeOwner(AH, te);
3954 _selectOutputSchema(AH, te->namespace);
3958
3959 /* Emit header comment for item */
3960 if (!AH->noTocComments)
3961 {
3962 char *sanitized_name;
3963 char *sanitized_schema;
3964 char *sanitized_owner;
3965
3966 ahprintf(AH, "--\n");
3967 if (AH->public.verbose)
3968 {
3969 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3970 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3971 if (te->nDeps > 0)
3972 {
3973 int i;
3974
3975 ahprintf(AH, "-- Dependencies:");
3976 for (i = 0; i < te->nDeps; i++)
3977 ahprintf(AH, " %d", te->dependencies[i]);
3978 ahprintf(AH, "\n");
3979 }
3980 }
3981
3982 sanitized_name = sanitize_line(te->tag, false);
3983 sanitized_schema = sanitize_line(te->namespace, true);
3984 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3985
3986 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3989
3993
3994 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3995 {
3997
3999 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
4001 }
4002 ahprintf(AH, "\n");
4003
4004 if (AH->PrintExtraTocPtr != NULL)
4005 AH->PrintExtraTocPtr(AH, te);
4006 ahprintf(AH, "--\n\n");
4007 }
4008
4009 /*
4010 * Actually print the definition. Normally we can just print the defn
4011 * string if any, but we have four special cases:
4012 *
4013 * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
4014 * versions put into CREATE SCHEMA. Don't mutate the variant for schema
4015 * "public" that is a comment. We have to do this when --no-owner mode is
4016 * selected. This is ugly, but I see no other good way ...
4017 *
4018 * 2. BLOB METADATA entries need special processing since their defn
4019 * strings are just lists of OIDs, not complete SQL commands.
4020 *
4021 * 3. ACL LARGE OBJECTS entries need special processing because they
4022 * contain only one copy of the ACL GRANT/REVOKE commands, which we must
4023 * apply to each large object listed in the associated BLOB METADATA.
4024 *
4025 * 4. Entries with a defnDumper need to call it to generate the
4026 * definition. This is primarily intended to provide a way to save memory
4027 * for objects that would otherwise need a lot of it (e.g., statistics
4028 * data).
4029 */
4030 if (ropt->noOwner &&
4031 strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
4032 {
4033 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
4034 }
4035 else if (strcmp(te->desc, "BLOB METADATA") == 0)
4036 {
4037 IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
4038 }
4039 else if (strcmp(te->desc, "ACL") == 0 &&
4040 strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
4041 {
4042 IssueACLPerBlob(AH, te);
4043 }
4044 else if (te->defnLen && AH->format != archTar)
4045 {
4046 /*
4047 * If defnLen is set, the defnDumper has already been called for this
4048 * TOC entry. We don't normally expect a defnDumper to be called for
4049 * a TOC entry a second time in _printTocEntry(), but there's an
4050 * exception. The tar format first calls WriteToc(), which scans the
4051 * entire TOC, and then it later calls RestoreArchive() to generate
4052 * restore.sql, which scans the TOC again. There doesn't appear to be
4053 * a good way to prevent a second defnDumper call in this case without
4054 * storing the definition in memory, which defeats the purpose. This
4055 * second defnDumper invocation should generate the same output as the
4056 * first, but even if it doesn't, the worst-case scenario is that
4057 * restore.sql might have different statistics data than the archive.
4058 *
4059 * In all other cases, encountering a TOC entry a second time in
4060 * _printTocEntry() is unexpected, so we fail because one of our
4061 * assumptions must no longer hold true.
4062 *
4063 * XXX This is a layering violation, but the alternative is an awkward
4064 * and complicated callback infrastructure for this special case. This
4065 * might be worth revisiting in the future.
4066 */
4067 pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
4068 te->dumpId, te->desc, te->tag);
4069 }
4070 else if (te->defnDumper)
4071 {
4072 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
4073
4074 te->defnLen = ahprintf(AH, "%s\n\n", defn);
4075 pg_free(defn);
4076 }
4077 else if (te->defn && strlen(te->defn) > 0)
4078 {
4079 ahprintf(AH, "%s\n\n", te->defn);
4080
4081 /*
4082 * If the defn string contains multiple SQL commands, txn_size mode
4083 * should count it as N actions not one. But rather than build a full
4084 * SQL parser, approximate this by counting semicolons. One case
4085 * where that tends to be badly fooled is function definitions, so
4086 * ignore them. (restore_toc_entry will count one action anyway.)
4087 */
4088 if (ropt->txn_size > 0 &&
4089 strcmp(te->desc, "FUNCTION") != 0 &&
4090 strcmp(te->desc, "PROCEDURE") != 0)
4091 {
4092 const char *p = te->defn;
4093 int nsemis = 0;
4094
4095 while ((p = strchr(p, ';')) != NULL)
4096 {
4097 nsemis++;
4098 p++;
4099 }
4100 if (nsemis > 1)
4101 AH->txnCount += nsemis - 1;
4102 }
4103 }
4104
4105 /*
4106 * If we aren't using SET SESSION AUTH to determine ownership, we must
4107 * instead issue an ALTER OWNER command. Schema "public" is special; when
4108 * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
4109 * when using SET SESSION for all other objects. We assume that anything
4110 * without a DROP command is not a separately ownable object.
4111 */
4112 if (!ropt->noOwner &&
4113 (!ropt->use_setsessauth ||
4114 (strcmp(te->desc, "SCHEMA") == 0 &&
4115 strncmp(te->defn, "--", 2) == 0)) &&
4116 te->owner && strlen(te->owner) > 0 &&
4117 te->dropStmt && strlen(te->dropStmt) > 0)
4118 {
4119 if (strcmp(te->desc, "BLOB METADATA") == 0)
4120 {
4121 /* BLOB METADATA needs special code to handle multiple LOs */
4122 char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4123
4124 IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4125 pg_free(cmdEnd);
4126 }
4127 else
4128 {
4129 /* For all other cases, we can use _getObjectDescription */
4131
4134
4135 /*
4136 * If _getObjectDescription() didn't fill the buffer, then there
4137 * is no owner.
4138 */
4139 if (temp.data[0])
4140 ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4141 temp.data, fmtId(te->owner));
4143 }
4144 }
4145
4146 /*
4147 * Select a partitioned table's default AM, once the table definition has
4148 * been generated.
4149 */
4152
4153 /*
4154 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4155 * commands, so we can no longer assume we know the current auth setting.
4156 */
4157 if (_tocEntryIsACL(te))
4158 {
4159 free(AH->currUser);
4160 AH->currUser = NULL;
4161 }
4162}
4163
4164/*
4165 * Write the file header for a custom-format archive
4166 */
4167void
4169{
4170 struct tm crtm;
4171
4172 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4173 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4174 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4175 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4176 AH->WriteBytePtr(AH, AH->intSize);
4177 AH->WriteBytePtr(AH, AH->offSize);
4178 AH->WriteBytePtr(AH, AH->format);
4180 crtm = *localtime(&AH->createDate);
4181 WriteInt(AH, crtm.tm_sec);
4182 WriteInt(AH, crtm.tm_min);
4183 WriteInt(AH, crtm.tm_hour);
4184 WriteInt(AH, crtm.tm_mday);
4185 WriteInt(AH, crtm.tm_mon);
4186 WriteInt(AH, crtm.tm_year);
4187 WriteInt(AH, crtm.tm_isdst);
4188 WriteStr(AH, PQdb(AH->connection));
4190 WriteStr(AH, PG_VERSION);
4191}
4192
4193void
4195{
4196 char *errmsg;
4197 char vmaj,
4198 vmin,
4199 vrev;
4200 int fmt;
4201
4202 /*
4203 * If we haven't already read the header, do so.
4204 *
4205 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4206 * way to unify the cases?
4207 */
4208 if (!AH->readHeader)
4209 {
4210 char tmpMag[7];
4211
4212 AH->ReadBufPtr(AH, tmpMag, 5);
4213
4214 if (strncmp(tmpMag, "PGDMP", 5) != 0)
4215 pg_fatal("did not find magic string in file header");
4216 }
4217
4218 vmaj = AH->ReadBytePtr(AH);
4219 vmin = AH->ReadBytePtr(AH);
4220
4221 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4222 vrev = AH->ReadBytePtr(AH);
4223 else
4224 vrev = 0;
4225
4227
4228 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4229 pg_fatal("unsupported version (%d.%d) in file header",
4230 vmaj, vmin);
4231
4232 AH->intSize = AH->ReadBytePtr(AH);
4233 if (AH->intSize > 32)
4234 pg_fatal("sanity check on integer size (%zu) failed", AH->intSize);
4235
4236 if (AH->intSize > sizeof(int))
4237 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4238
4239 if (AH->version >= K_VERS_1_7)
4240 AH->offSize = AH->ReadBytePtr(AH);
4241 else
4242 AH->offSize = AH->intSize;
4243
4244 fmt = AH->ReadBytePtr(AH);
4245
4246 if (AH->format != fmt)
4247 pg_fatal("expected format (%d) differs from format found in file (%d)",
4248 AH->format, fmt);
4249
4250 if (AH->version >= K_VERS_1_15)
4252 else if (AH->version >= K_VERS_1_2)
4253 {
4254 /* Guess the compression method based on the level */
4255 if (AH->version < K_VERS_1_4)
4256 AH->compression_spec.level = AH->ReadBytePtr(AH);
4257 else
4258 AH->compression_spec.level = ReadInt(AH);
4259
4260 if (AH->compression_spec.level != 0)
4262 }
4263 else
4265
4267 if (errmsg)
4268 {
4269 pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4270 errmsg);
4271 pg_free(errmsg);
4272 }
4273
4274 if (AH->version >= K_VERS_1_4)
4275 {
4276 struct tm crtm;
4277
4278 crtm.tm_sec = ReadInt(AH);
4279 crtm.tm_min = ReadInt(AH);
4280 crtm.tm_hour = ReadInt(AH);
4281 crtm.tm_mday = ReadInt(AH);
4282 crtm.tm_mon = ReadInt(AH);
4283 crtm.tm_year = ReadInt(AH);
4284 crtm.tm_isdst = ReadInt(AH);
4285
4286 /*
4287 * Newer versions of glibc have mktime() report failure if tm_isdst is
4288 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4289 * TZ=UTC. This is problematic when restoring an archive under a
4290 * different timezone setting. If we get a failure, try again with
4291 * tm_isdst set to -1 ("don't know").
4292 *
4293 * XXX with or without this hack, we reconstruct createDate
4294 * incorrectly when the prevailing timezone is different from
4295 * pg_dump's. Next time we bump the archive version, we should flush
4296 * this representation and store a plain seconds-since-the-Epoch
4297 * timestamp instead.
4298 */
4299 AH->createDate = mktime(&crtm);
4300 if (AH->createDate == (time_t) -1)
4301 {
4302 crtm.tm_isdst = -1;
4303 AH->createDate = mktime(&crtm);
4304 if (AH->createDate == (time_t) -1)
4305 pg_log_warning("invalid creation date in header");
4306 }
4307 }
4308
4309 if (AH->version >= K_VERS_1_4)
4310 {
4311 AH->archdbname = ReadStr(AH);
4312 }
4313
4314 if (AH->version >= K_VERS_1_10)
4315 {
4316 AH->archiveRemoteVersion = ReadStr(AH);
4317 AH->archiveDumpVersion = ReadStr(AH);
4318 }
4319}
4320
4321
4322/*
4323 * checkSeek
4324 * check to see if ftell/fseek can be performed.
4325 */
4326bool
4328{
4329 pgoff_t tpos;
4330
4331 /* Check that ftello works on this file */
4332 tpos = ftello(fp);
4333 if (tpos < 0)
4334 return false;
4335
4336 /*
4337 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4338 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4339 * successful no-op even on files that are otherwise unseekable.
4340 */
4341 if (fseeko(fp, tpos, SEEK_SET) != 0)
4342 return false;
4343
4344 return true;
4345}
4346
4347
4348/*
4349 * dumpTimestamp
4350 */
4351static void
4353{
4354 char buf[64];
4355
4356 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4357 ahprintf(AH, "-- %s %s\n\n", msg, buf);
4358}
4359
4360/*
4361 * Main engine for parallel restore.
4362 *
4363 * Parallel restore is done in three phases. In this first phase,
4364 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4365 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4366 * PRE_DATA items other than ACLs.) Entries we can't process now are
4367 * added to the pending_list for later phases to deal with.
4368 */
4369static void
4371{
4372 bool skipped_some;
4374
4375 pg_log_debug("entering restore_toc_entries_prefork");
4376
4377 /* Adjust dependency information */
4378 fix_dependencies(AH);
4379
4380 /*
4381 * Do all the early stuff in a single connection in the parent. There's no
4382 * great point in running it in parallel, in fact it will actually run
4383 * faster in a single connection because we avoid all the connection and
4384 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4385 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4386 * not risk trying to process them out-of-order.
4387 *
4388 * Stuff that we can't do immediately gets added to the pending_list.
4389 * Note: we don't yet filter out entries that aren't going to be restored.
4390 * They might participate in dependency chains connecting entries that
4391 * should be restored, so we treat them as live until we actually process
4392 * them.
4393 *
4394 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4395 * before DATA items, and all DATA items before POST_DATA items. That is
4396 * not certain to be true in older archives, though, and in any case use
4397 * of a list file would destroy that ordering (cf. SortTocFromFile). So
4398 * this loop cannot assume that it holds.
4399 */
4401 skipped_some = false;
4403 {
4404 bool do_now = true;
4405
4406 if (next_work_item->section != SECTION_PRE_DATA)
4407 {
4408 /* DATA and POST_DATA items are just ignored for now */
4409 if (next_work_item->section == SECTION_DATA ||
4411 {
4412 do_now = false;
4413 skipped_some = true;
4414 }
4415 else
4416 {
4417 /*
4418 * SECTION_NONE items, such as comments, can be processed now
4419 * if we are still in the PRE_DATA part of the archive. Once
4420 * we've skipped any items, we have to consider whether the
4421 * comment's dependencies are satisfied, so skip it for now.
4422 */
4423 if (skipped_some)
4424 do_now = false;
4425 }
4426 }
4427
4428 /*
4429 * Also skip items that need to be forced into later passes. We need
4430 * not set skipped_some in this case, since by assumption no main-pass
4431 * items could depend on these.
4432 */
4434 do_now = false;
4435
4436 if (do_now)
4437 {
4438 /* OK, restore the item and update its dependencies */
4439 pg_log_info("processing item %d %s %s",
4440 next_work_item->dumpId,
4441 next_work_item->desc, next_work_item->tag);
4442
4444
4445 /* Reduce dependencies, but don't move anything to ready_heap */
4447 }
4448 else
4449 {
4450 /* Nope, so add it to pending_list */
4452 }
4453 }
4454
4455 /*
4456 * In --transaction-size mode, we must commit the open transaction before
4457 * dropping the database connection. This also ensures that child workers
4458 * can see the objects we've created so far.
4459 */
4460 if (AH->public.ropt->txn_size > 0)
4462
4463 /*
4464 * Now close parent connection in prep for parallel steps. We do this
4465 * mainly to ensure that we don't exceed the specified number of parallel
4466 * connections.
4467 */
4469
4470 /* blow away any transient state from the old connection */
4471 free(AH->currUser);
4472 AH->currUser = NULL;
4473 free(AH->currSchema);
4474 AH->currSchema = NULL;
4475 free(AH->currTablespace);
4476 AH->currTablespace = NULL;
4477 free(AH->currTableAm);
4478 AH->currTableAm = NULL;
4479}
4480
4481/*
4482 * Main engine for parallel restore.
4483 *
4484 * Parallel restore is done in three phases. In this second phase,
4485 * we process entries by dispatching them to parallel worker children
4486 * (processes on Unix, threads on Windows), each of which connects
4487 * separately to the database. Inter-entry dependencies are respected,
4488 * and so is the RestorePass multi-pass structure. When we can no longer
4489 * make any entries ready to process, we exit. Normally, there will be
4490 * nothing left to do; but if there is, the third phase will mop up.
4491 */
4492static void
4495{
4498
4499 pg_log_debug("entering restore_toc_entries_parallel");
4500
4501 /* Set up ready_heap with enough room for all known TocEntrys */
4504 NULL);
4505
4506 /*
4507 * The pending_list contains all items that we need to restore. Move all
4508 * items that are available to process immediately into the ready_heap.
4509 * After this setup, the pending list is everything that needs to be done
4510 * but is blocked by one or more dependencies, while the ready heap
4511 * contains items that have no remaining dependencies and are OK to
4512 * process in the current restore pass.
4513 */
4516
4517 /*
4518 * main parent loop
4519 *
4520 * Keep going until there is no worker still running AND there is no work
4521 * left to be done. Note invariant: at top of loop, there should always
4522 * be at least one worker available to dispatch a job to.
4523 */
4524 pg_log_info("entering main parallel loop");
4525
4526 for (;;)
4527 {
4528 /* Look for an item ready to be dispatched to a worker */
4530 if (next_work_item != NULL)
4531 {
4532 /* If not to be restored, don't waste time launching a worker */
4533 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4534 {
4535 pg_log_info("skipping item %d %s %s",
4536 next_work_item->dumpId,
4537 next_work_item->desc, next_work_item->tag);
4538 /* Update its dependencies as though we'd completed it */
4540 /* Loop around to see if anything else can be dispatched */
4541 continue;
4542 }
4543
4544 pg_log_info("launching item %d %s %s",
4545 next_work_item->dumpId,
4546 next_work_item->desc, next_work_item->tag);
4547
4548 /* Dispatch to some worker */
4551 }
4552 else if (IsEveryWorkerIdle(pstate))
4553 {
4554 /*
4555 * Nothing is ready and no worker is running, so we're done with
4556 * the current pass or maybe with the whole process.
4557 */
4558 if (AH->restorePass == RESTORE_PASS_LAST)
4559 break; /* No more parallel processing is possible */
4560
4561 /* Advance to next restore pass */
4562 AH->restorePass++;
4563 /* That probably allows some stuff to be made ready */
4565 /* Loop around to see if anything's now ready */
4566 continue;
4567 }
4568 else
4569 {
4570 /*
4571 * We have nothing ready, but at least one child is working, so
4572 * wait for some subjob to finish.
4573 */
4574 }
4575
4576 /*
4577 * Before dispatching another job, check to see if anything has
4578 * finished. We should check every time through the loop so as to
4579 * reduce dependencies as soon as possible. If we were unable to
4580 * dispatch any job this time through, wait until some worker finishes
4581 * (and, hopefully, unblocks some pending item). If we did dispatch
4582 * something, continue as soon as there's at least one idle worker.
4583 * Note that in either case, there's guaranteed to be at least one
4584 * idle worker when we return to the top of the loop. This ensures we
4585 * won't block inside DispatchJobForTocEntry, which would be
4586 * undesirable: we'd rather postpone dispatching until we see what's
4587 * been unblocked by finished jobs.
4588 */
4589 WaitForWorkers(AH, pstate,
4591 }
4592
4593 /* There should now be nothing in ready_heap. */
4595
4597
4598 pg_log_info("finished main parallel loop");
4599}
4600
4601/*
4602 * Main engine for parallel restore.
4603 *
4604 * Parallel restore is done in three phases. In this third phase,
4605 * we mop up any remaining TOC entries by processing them serially.
4606 * This phase normally should have nothing to do, but if we've somehow
4607 * gotten stuck due to circular dependencies or some such, this provides
4608 * at least some chance of completing the restore successfully.
4609 */
4610static void
4612{
4613 RestoreOptions *ropt = AH->public.ropt;
4614 TocEntry *te;
4615
4616 pg_log_debug("entering restore_toc_entries_postfork");
4617
4618 /*
4619 * Now reconnect the single parent connection.
4620 */
4621 ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4622
4623 /* re-establish fixed state */
4625
4626 /*
4627 * Make sure there is no work left due to, say, circular dependencies, or
4628 * some other pathological condition. If so, do it in the single parent
4629 * connection. We don't sweat about RestorePass ordering; it's likely we
4630 * already violated that.
4631 */
4632 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4633 {
4634 pg_log_info("processing missed item %d %s %s",
4635 te->dumpId, te->desc, te->tag);
4636 (void) restore_toc_entry(AH, te, false);
4637 }
4638}
4639
4640/*
4641 * Check if te1 has an exclusive lock requirement for an item that te2 also
4642 * requires, whether or not te2's requirement is for an exclusive lock.
4643 */
4644static bool
4646{
4647 int j,
4648 k;
4649
4650 for (j = 0; j < te1->nLockDeps; j++)
4651 {
4652 for (k = 0; k < te2->nDeps; k++)
4653 {
4654 if (te1->lockDeps[j] == te2->dependencies[k])
4655 return true;
4656 }
4657 }
4658 return false;
4659}
4660
4661
4662/*
4663 * Initialize the header of the pending-items list.
4664 *
4665 * This is a circular list with a dummy TocEntry as header, just like the
4666 * main TOC list; but we use separate list links so that an entry can be in
4667 * the main TOC list as well as in the pending list.
4668 */
4669static void
4674
4675/* Append te to the end of the pending-list headed by l */
4676static void
4678{
4679 te->pending_prev = l->pending_prev;
4680 l->pending_prev->pending_next = te;
4681 l->pending_prev = te;
4682 te->pending_next = l;
4683}
4684
4685/* Remove te from the pending-list */
4686static void
4694
4695
4696/* qsort comparator for sorting TocEntries by dataLength */
4697static int
4698TocEntrySizeCompareQsort(const void *p1, const void *p2)
4699{
4700 const TocEntry *te1 = *(const TocEntry *const *) p1;
4701 const TocEntry *te2 = *(const TocEntry *const *) p2;
4702
4703 /* Sort by decreasing dataLength */
4704 if (te1->dataLength > te2->dataLength)
4705 return -1;
4706 if (te1->dataLength < te2->dataLength)
4707 return 1;
4708
4709 /* For equal dataLengths, sort by dumpId, just to be stable */
4710 if (te1->dumpId < te2->dumpId)
4711 return -1;
4712 if (te1->dumpId > te2->dumpId)
4713 return 1;
4714
4715 return 0;
4716}
4717
4718/* binaryheap comparator for sorting TocEntries by dataLength */
4719static int
4721{
4722 /* return opposite of qsort comparator for max-heap */
4723 return -TocEntrySizeCompareQsort(&p1, &p2);
4724}
4725
4726
4727/*
4728 * Move all immediately-ready items from pending_list to ready_heap.
4729 *
4730 * Items are considered ready if they have no remaining dependencies and
4731 * they belong in the current restore pass. (See also reduce_dependencies,
4732 * which applies the same logic one-at-a-time.)
4733 */
4734static void
4738{
4739 TocEntry *te;
4741
4742 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4743 {
4744 /* must save list link before possibly removing te from list */
4745 next_te = te->pending_next;
4746
4747 if (te->depCount == 0 &&
4749 {
4750 /* Remove it from pending_list ... */
4752 /* ... and add to ready_heap */
4754 }
4755 }
4756}
4757
4758/*
4759 * Find the next work item (if any) that is capable of being run now,
4760 * and remove it from the ready_heap.
4761 *
4762 * Returns the item, or NULL if nothing is runnable.
4763 *
4764 * To qualify, the item must have no remaining dependencies
4765 * and no requirements for locks that are incompatible with
4766 * items currently running. Items in the ready_heap are known to have
4767 * no remaining dependencies, but we have to check for lock conflicts.
4768 */
4769static TocEntry *
4771 ParallelState *pstate)
4772{
4773 /*
4774 * Search the ready_heap until we find a suitable item. Note that we do a
4775 * sequential scan through the heap nodes, so even though we will first
4776 * try to choose the highest-priority item, we might end up picking
4777 * something with a much lower priority. However, we expect that we will
4778 * typically be able to pick one of the first few items, which should
4779 * usually have a relatively high priority.
4780 */
4781 for (int i = 0; i < binaryheap_size(ready_heap); i++)
4782 {
4784 bool conflicts = false;
4785
4786 /*
4787 * Check to see if the item would need exclusive lock on something
4788 * that a currently running item also needs lock on, or vice versa. If
4789 * so, we don't want to schedule them together.
4790 */
4791 for (int k = 0; k < pstate->numWorkers; k++)
4792 {
4793 TocEntry *running_te = pstate->te[k];
4794
4795 if (running_te == NULL)
4796 continue;
4797 if (has_lock_conflicts(te, running_te) ||
4799 {
4800 conflicts = true;
4801 break;
4802 }
4803 }
4804
4805 if (conflicts)
4806 continue;
4807
4808 /* passed all tests, so this item can run */
4810 return te;
4811 }
4812
4813 pg_log_debug("no item ready");
4814 return NULL;
4815}
4816
4817
4818/*
4819 * Restore a single TOC item in parallel with others
4820 *
4821 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4822 * (everything else). A worker process executes several such work items during
4823 * a parallel backup or restore. Once we terminate here and report back that
4824 * our work is finished, the leader process will assign us a new work item.
4825 */
4826int
4828{
4829 int status;
4830
4831 Assert(AH->connection != NULL);
4832
4833 /* Count only errors associated with this TOC entry */
4834 AH->public.n_errors = 0;
4835
4836 /* Restore the TOC item */
4837 status = restore_toc_entry(AH, te, true);
4838
4839 return status;
4840}
4841
4842
4843/*
4844 * Callback function that's invoked in the leader process after a step has
4845 * been parallel restored.
4846 *
4847 * Update status and reduce the dependency count of any dependent items.
4848 */
4849static void
4851 TocEntry *te,
4852 int status,
4853 void *callback_data)
4854{
4855 binaryheap *ready_heap = (binaryheap *) callback_data;
4856
4857 pg_log_info("finished item %d %s %s",
4858 te->dumpId, te->desc, te->tag);
4859
4860 if (status == WORKER_CREATE_DONE)
4861 mark_create_done(AH, te);
4862 else if (status == WORKER_INHIBIT_DATA)
4863 {
4865 AH->public.n_errors++;
4866 }
4867 else if (status == WORKER_IGNORED_ERRORS)
4868 AH->public.n_errors++;
4869 else if (status != 0)
4870 pg_fatal("worker process failed: exit code %d",
4871 status);
4872
4874}
4875
4876
4877/*
4878 * Process the dependency information into a form useful for parallel restore.
4879 *
4880 * This function takes care of fixing up some missing or badly designed
4881 * dependencies, and then prepares subsidiary data structures that will be
4882 * used in the main parallel-restore logic, including:
4883 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4884 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4885 * dependencies for each TOC entry.
4886 *
4887 * We also identify locking dependencies so that we can avoid trying to
4888 * schedule conflicting items at the same time.
4889 */
4890static void
4892{
4893 TocEntry *te;
4894 int i;
4895
4896 /*
4897 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4898 * items are marked as not being in any parallel-processing list.
4899 */
4900 for (te = AH->toc->next; te != AH->toc; te = te->next)
4901 {
4902 te->depCount = te->nDeps;
4903 te->revDeps = NULL;
4904 te->nRevDeps = 0;
4905 te->pending_prev = NULL;
4906 te->pending_next = NULL;
4907 }
4908
4909 /*
4910 * POST_DATA items that are shown as depending on a table need to be
4911 * re-pointed to depend on that table's data, instead. This ensures they
4912 * won't get scheduled until the data has been loaded.
4913 */
4915
4916 /*
4917 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4918 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4919 * one BLOB COMMENTS in such files.)
4920 */
4921 if (AH->version < K_VERS_1_11)
4922 {
4923 for (te = AH->toc->next; te != AH->toc; te = te->next)
4924 {
4925 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4926 {
4927 TocEntry *te2;
4928
4929 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4930 {
4931 if (strcmp(te2->desc, "BLOBS") == 0)
4932 {
4934 te->dependencies[0] = te2->dumpId;
4935 te->nDeps++;
4936 te->depCount++;
4937 break;
4938 }
4939 }
4940 break;
4941 }
4942 }
4943 }
4944
4945 /*
4946 * At this point we start to build the revDeps reverse-dependency arrays,
4947 * so all changes of dependencies must be complete.
4948 */
4949
4950 /*
4951 * Count the incoming dependencies for each item. Also, it is possible
4952 * that the dependencies list items that are not in the archive at all
4953 * (that should not happen in 9.2 and later, but is highly likely in older
4954 * archives). Subtract such items from the depCounts.
4955 */
4956 for (te = AH->toc->next; te != AH->toc; te = te->next)
4957 {
4958 for (i = 0; i < te->nDeps; i++)
4959 {
4960 DumpId depid = te->dependencies[i];
4961
4962 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4963 AH->tocsByDumpId[depid]->nRevDeps++;
4964 else
4965 te->depCount--;
4966 }
4967 }
4968
4969 /*
4970 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4971 * it as a counter below.
4972 */
4973 for (te = AH->toc->next; te != AH->toc; te = te->next)
4974 {
4975 if (te->nRevDeps > 0)
4977 te->nRevDeps = 0;
4978 }
4979
4980 /*
4981 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4982 * better agree with the loops above.
4983 */
4984 for (te = AH->toc->next; te != AH->toc; te = te->next)
4985 {
4986 for (i = 0; i < te->nDeps; i++)
4987 {
4988 DumpId depid = te->dependencies[i];
4989
4990 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4991 {
4993
4994 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4995 }
4996 }
4997 }
4998
4999 /*
5000 * Lastly, work out the locking dependencies.
5001 */
5002 for (te = AH->toc->next; te != AH->toc; te = te->next)
5003 {
5004 te->lockDeps = NULL;
5005 te->nLockDeps = 0;
5007 }
5008}
5009
5010/*
5011 * Change dependencies on table items to depend on table data items instead,
5012 * but only in POST_DATA items.
5013 *
5014 * Also, for any item having such dependency(s), set its dataLength to the
5015 * largest dataLength of the table data items it depends on. This ensures
5016 * that parallel restore will prioritize larger jobs (index builds, FK
5017 * constraint checks, etc) over smaller ones, avoiding situations where we
5018 * end a restore with only one active job working on a large table.
5019 */
5020static void
5022{
5023 TocEntry *te;
5024 int i;
5025 DumpId olddep;
5026
5027 for (te = AH->toc->next; te != AH->toc; te = te->next)
5028 {
5029 if (te->section != SECTION_POST_DATA)
5030 continue;
5031 for (i = 0; i < te->nDeps; i++)
5032 {
5033 olddep = te->dependencies[i];
5034 if (olddep <= AH->maxDumpId &&
5035 AH->tableDataId[olddep] != 0)
5036 {
5039
5040 te->dependencies[i] = tabledataid;
5041 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
5042 pg_log_debug("transferring dependency %d -> %d to %d",
5043 te->dumpId, olddep, tabledataid);
5044 }
5045 }
5046 }
5047}
5048
5049/*
5050 * Identify which objects we'll need exclusive lock on in order to restore
5051 * the given TOC entry (*other* than the one identified by the TOC entry
5052 * itself). Record their dump IDs in the entry's lockDeps[] array.
5053 */
5054static void
5056{
5057 DumpId *lockids;
5058 int nlockids;
5059 int i;
5060
5061 /*
5062 * We only care about this for POST_DATA items. PRE_DATA items are not
5063 * run in parallel, and DATA items are all independent by assumption.
5064 */
5065 if (te->section != SECTION_POST_DATA)
5066 return;
5067
5068 /* Quick exit if no dependencies at all */
5069 if (te->nDeps == 0)
5070 return;
5071
5072 /*
5073 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
5074 * and hence require exclusive lock. However, we know that CREATE INDEX
5075 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
5076 * category too ... but today is not that day.)
5077 */
5078 if (strcmp(te->desc, "INDEX") == 0)
5079 return;
5080
5081 /*
5082 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5083 * item listed among its dependencies. Originally all of these would have
5084 * been TABLE items, but repoint_table_dependencies would have repointed
5085 * them to the TABLE DATA items if those are present (which they might not
5086 * be, eg in a schema-only dump). Note that all of the entries we are
5087 * processing here are POST_DATA; otherwise there might be a significant
5088 * difference between a dependency on a table and a dependency on its
5089 * data, so that closer analysis would be needed here.
5090 */
5092 nlockids = 0;
5093 for (i = 0; i < te->nDeps; i++)
5094 {
5095 DumpId depid = te->dependencies[i];
5096
5097 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5098 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5099 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5100 lockids[nlockids++] = depid;
5101 }
5102
5103 if (nlockids == 0)
5104 {
5105 free(lockids);
5106 return;
5107 }
5108
5110 te->nLockDeps = nlockids;
5111}
5112
5113/*
5114 * Remove the specified TOC entry from the depCounts of items that depend on
5115 * it, thereby possibly making them ready-to-run. Any pending item that
5116 * becomes ready should be moved to the ready_heap, if that's provided.
5117 */
5118static void
5121{
5122 int i;
5123
5124 pg_log_debug("reducing dependencies for %d", te->dumpId);
5125
5126 for (i = 0; i < te->nRevDeps; i++)
5127 {
5128 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5129
5130 Assert(otherte->depCount > 0);
5131 otherte->depCount--;
5132
5133 /*
5134 * It's ready if it has no remaining dependencies, and it belongs in
5135 * the current restore pass, and it is currently a member of the
5136 * pending list (that check is needed to prevent double restore in
5137 * some cases where a list-file forces out-of-order restoring).
5138 * However, if ready_heap == NULL then caller doesn't want any list
5139 * memberships changed.
5140 */
5141 if (otherte->depCount == 0 &&
5143 otherte->pending_prev != NULL &&
5144 ready_heap != NULL)
5145 {
5146 /* Remove it from pending list ... */
5148 /* ... and add to ready_heap */
5150 }
5151 }
5152}
5153
5154/*
5155 * Set the created flag on the DATA member corresponding to the given
5156 * TABLE member
5157 */
5158static void
5160{
5161 if (AH->tableDataId[te->dumpId] != 0)
5162 {
5163 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5164
5165 ted->created = true;
5166 }
5167}
5168
5169/*
5170 * Mark the DATA member corresponding to the given TABLE member
5171 * as not wanted
5172 */
5173static void
5175{
5176 pg_log_info("table \"%s\" could not be created, will not restore its data",
5177 te->tag);
5178
5179 if (AH->tableDataId[te->dumpId] != 0)
5180 {
5181 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5182
5183 ted->reqs = 0;
5184 }
5185}
5186
5187/*
5188 * Clone and de-clone routines used in parallel restoration.
5189 *
5190 * Enough of the structure is cloned to ensure that there is no
5191 * conflict between different threads each with their own clone.
5192 */
5195{
5197
5198 /* Make a "flat" copy */
5200 memcpy(clone, AH, sizeof(ArchiveHandle));
5201
5202 /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5203 clone->public.ropt = pg_malloc_object(RestoreOptions);
5204 memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5205
5206 /* Handle format-independent fields */
5207 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5208
5209 /* The clone will have its own connection, so disregard connection state */
5210 clone->connection = NULL;
5211 clone->connCancel = NULL;
5212 clone->currUser = NULL;
5213 clone->currSchema = NULL;
5214 clone->currTableAm = NULL;
5215 clone->currTablespace = NULL;
5216
5217 /* savedPassword must be local in case we change it while connecting */
5218 if (clone->savedPassword)
5219 clone->savedPassword = pg_strdup(clone->savedPassword);
5220
5221 /* clone has its own error count, too */
5222 clone->public.n_errors = 0;
5223
5224 /* clones should not share lo_buf */
5225 clone->lo_buf = NULL;
5226
5227 /*
5228 * Clone connections disregard --transaction-size; they must commit after
5229 * each command so that the results are immediately visible to other
5230 * workers.
5231 */
5232 clone->public.ropt->txn_size = 0;
5233
5234 /*
5235 * Connect our new clone object to the database, using the same connection
5236 * parameters used for the original connection.
5237 */
5238 ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5239
5240 /* re-establish fixed state */
5241 if (AH->mode == archModeRead)
5243 /* in write case, setupDumpWorker will fix up connection state */
5244
5245 /* Let the format-specific code have a chance too */
5246 clone->ClonePtr(clone);
5247
5248 Assert(clone->connection != NULL);
5249 return clone;
5250}
5251
5252/*
5253 * Release clone-local storage.
5254 *
5255 * Note: we assume any clone-local connection was already closed.
5256 */
5257void
5259{
5260 /* Should not have an open database connection */
5261 Assert(AH->connection == NULL);
5262
5263 /* Clear format-specific state */
5264 AH->DeClonePtr(AH);
5265
5266 /* Clear state allocated by CloneArchive */
5267 if (AH->sqlparse.curCmd)
5269
5270 /* Clear any connection-local state */
5271 free(AH->currUser);
5272 free(AH->currSchema);
5273 free(AH->currTablespace);
5274 free(AH->currTableAm);
5275 free(AH->savedPassword);
5276
5277 free(AH);
5278}
int lo_write(int fd, const char *buf, int len)
Definition be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition parallel.c:1075
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition parallel.c:1467
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition parallel.c:913
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition parallel.c:1221
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition parallel.c:1284
@ WFW_ALL_IDLE
Definition parallel.h:35
@ WFW_GOT_STATUS
Definition parallel.h:33
@ WFW_ONE_IDLE
Definition parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition binaryheap.c:223
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition binaryheap.c:152
void binaryheap_free(binaryheap *heap)
Definition binaryheap.c:73
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition binaryheap.c:37
#define binaryheap_size(h)
Definition binaryheap.h:66
#define binaryheap_empty(h)
Definition binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition binaryheap.h:67
#define PG_BINARY_R
Definition c.h:1348
#define ngettext(s, p, n)
Definition c.h:1242
#define PG_BINARY_A
Definition c.h:1347
#define Max(x, y)
Definition c.h:1057
#define Assert(condition)
Definition c.h:915
#define PG_BINARY_W
Definition c.h:1349
bool EndCompressFileHandle(CompressFileHandle *CFH)
char * supports_compression(const pg_compress_specification compression_spec)
Definition compress_io.c:87
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition compression.c:69
@ PG_COMPRESSION_GZIP
Definition compression.h:24
@ PG_COMPRESSION_NONE
Definition compression.h:23
char * sanitize_line(const char *str, bool want_hyphen)
Definition dumputils.c:52
#define PGDUMP_STRFTIME_FMT
Definition dumputils.h:34
Datum arg
Definition elog.c:1322
char * PQdb(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int lo_close(PGconn *conn, int fd)
Definition fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition fe-lobj.c:474
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition fe_memutils.c:53
void pg_free(void *ptr)
#define pg_realloc_array(pointer, type, count)
Definition fe_memutils.h:63
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
#define pg_malloc0_object(type)
Definition fe_memutils.h:51
#define pg_malloc_object(type)
Definition fe_memutils.h:50
#define pg_malloc0_array(type, count)
Definition fe_memutils.h:57
DataDirSyncMethod
Definition file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition file_utils.h:29
int remaining
Definition informix.c:692
char sign
Definition informix.c:693
static char * encoding
Definition initdb.c:139
static DataDirSyncMethod sync_method
Definition initdb.c:170
int b
Definition isn.c:74
return true
Definition isn.c:130
int j
Definition isn.c:78
int i
Definition isn.c:77
#define PQclear
#define PQresultStatus
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
#define INV_WRITE
Definition libpq-fs.h:21
static struct pg_tm tm
Definition localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition logging.c:219
#define pg_log_info(...)
Definition logging.h:124
@ PG_LOG_PRIMARY
Definition logging.h:67
@ PG_LOG_ERROR
Definition logging.h:43
#define pg_log_debug(...)
Definition logging.h:133
static size_t append_data(char *buf, size_t size, size_t nmemb, void *userdata)
static char * errmsg
static AmcheckOptions opts
Definition pg_amcheck.c:112
@ SECTION_NONE
Definition pg_backup.h:57
@ SECTION_POST_DATA
Definition pg_backup.h:60
@ SECTION_PRE_DATA
Definition pg_backup.h:58
@ SECTION_DATA
Definition pg_backup.h:59
int DumpId
Definition pg_backup.h:285
void(* SetupWorkerPtrType)(Archive *AH)
Definition pg_backup.h:292
enum _archiveFormat ArchiveFormat
void ConnectDatabaseAhx(Archive *AHX, const ConnParams *cparams, bool isReconnect)
@ archModeWrite
Definition pg_backup.h:51
@ archModeAppend
Definition pg_backup.h:50
@ archModeRead
Definition pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
enum _teSection teSection
@ archUnknown
Definition pg_backup.h:41
@ archTar
Definition pg_backup.h:43
@ archCustom
Definition pg_backup.h:42
@ archDirectory
Definition pg_backup.h:45
@ archNull
Definition pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
#define TOC_PREFIX_DATA
static void _becomeUser(ArchiveHandle *AH, const char *user)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
RestoreOptions * NewRestoreOptions(void)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
char * ReadStr(ArchiveHandle *AH)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static RestorePass _tocEntryRestorePass(TocEntry *te)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
int StartLO(Archive *AHX, Oid oid)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
#define TOC_PREFIX_STATS
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
static void setupRestoreWorker(Archive *AHX)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void StartRestoreLOs(ArchiveHandle *AH)
void RestoreArchive(Archive *AHX, bool append_data)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
DumpOptions * NewDumpOptions(void)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
#define TOC_PREFIX_NONE
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec, bool append_data)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define LOBBUFSIZE
void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
#define ARCHIVE_REV(version)
#define REQ_STATS
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, const char *cmdBegin, const char *cmdEnd)
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
#define K_VERS_1_16
@ RESTORE_PASS_POST_ACL
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
void exit_nicely(int code)
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition pg_ctl.c:81
static bool dosync
Definition pg_dump.c:152
static void setupDumpWorker(Archive *AH)
Definition pg_dump.c:1580
static char * filename
Definition pg_dumpall.c:133
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition pg_get_line.c:95
static char * user
Definition pg_regress.c:119
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define pg_encoding_to_char
Definition pg_wchar.h:630
#define pg_char_to_encoding
Definition pg_wchar.h:629
static char * tablespace
Definition pgbench.c:217
#define pg_log_warning(...)
Definition pgfnames.c:24
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
#define qsort(a, b, c, d)
Definition port.h:495
off_t pgoff_t
Definition port.h:421
#define InvalidOid
unsigned int Oid
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void termPQExpBuffer(PQExpBuffer str)
char * c
static int fb(int x)
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition psprintf.c:103
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
const char * simple_string_list_not_touched(SimpleStringList *list)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
#define free(a)
char * dbname
Definition streamutil.c:49
const char * fmtQualifiedId(const char *schema, const char *id)
const char * fmtId(const char *rawid)
void setFmtEncoding(int encoding)
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
int minRemoteVersion
Definition pg_backup.h:237
char * remoteVersionStr
Definition pg_backup.h:233
DumpOptions * dopt
Definition pg_backup.h:229
bool exit_on_error
Definition pg_backup.h:252
char * searchpath
Definition pg_backup.h:248
int maxRemoteVersion
Definition pg_backup.h:238
int n_errors
Definition pg_backup.h:253
bool std_strings
Definition pg_backup.h:245
int numWorkers
Definition pg_backup.h:240
int encoding
Definition pg_backup.h:244
int verbose
Definition pg_backup.h:232
RestoreOptions * ropt
Definition pg_backup.h:230
Oid tableoid
Definition pg_backup.h:281
TocEntry ** te
Definition parallel.h:59
SimpleStringListCell * head
Definition simple_list.h:42
ArchiverStage stage
RestorePass restorePass
ArchiveFormat format
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
DataDirSyncMethod sync_method
struct _tocEntry * lastErrorTE
ReadExtraTocPtrType ReadExtraTocPtr
struct _tocEntry * currentTE
CustomOutPtrType CustomOutPtr
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
struct _tocEntry ** tocsByDumpId
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
PrintTocDataPtrType PrintTocDataPtr
struct _tocEntry * currToc
WriteBytePtrType WriteBytePtr
sqlparseInfo sqlparse
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
ArchiverStage lastErrorStage
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
ArchiverOutput outputKind
EndDataPtrType EndDataPtr
SetupWorkerPtrType SetupWorkerPtr
ClosePtrType ClosePtr
char * pgport
Definition pg_backup.h:88
char * pghost
Definition pg_backup.h:89
trivalue promptPassword
Definition pg_backup.h:91
char * username
Definition pg_backup.h:90
char * dbname
Definition pg_backup.h:87
char * restrict_key
Definition pg_backup.h:220
int column_inserts
Definition pg_backup.h:185
int use_setsessauth
Definition pg_backup.h:198
int outputCreateDB
Definition pg_backup.h:206
bool include_everything
Definition pg_backup.h:203
int sequence_data
Definition pg_backup.h:212
int disable_dollar_quoting
Definition pg_backup.h:184
bool dumpSchema
Definition pg_backup.h:216
int outputNoTableAm
Definition pg_backup.h:196
int enable_row_security
Definition pg_backup.h:199
char * outputSuperuser
Definition pg_backup.h:210
int no_security_labels
Definition pg_backup.h:190
bool dumpStatistics
Definition pg_backup.h:218
int no_publications
Definition pg_backup.h:189
ConnParams cparams
Definition pg_backup.h:173
const char * lockWaitTimeout
Definition pg_backup.h:180
int no_subscriptions
Definition pg_backup.h:191
int outputNoTablespaces
Definition pg_backup.h:197
int disable_triggers
Definition pg_backup.h:195
int outputNoOwner
Definition pg_backup.h:209
SimpleStringList schemaExcludeNames
Definition pg_backup.h:141
int suppressDumpWarnings
Definition pg_backup.h:152
ConnParams cparams
Definition pg_backup.h:146
SimpleStringList functionNames
Definition pg_backup.h:139
SimpleStringList tableNames
Definition pg_backup.h:143
SimpleStringList indexNames
Definition pg_backup.h:138
pg_compress_specification compression_spec
Definition pg_backup.h:150
SimpleStringList triggerNames
Definition pg_backup.h:142
int disable_dollar_quoting
Definition pg_backup.h:110
SimpleStringList schemaNames
Definition pg_backup.h:140
char * restrict_key
Definition pg_backup.h:168
const char * filename
Definition pg_backup.h:121
const char * lockWaitTimeout
Definition pg_backup.h:125
int enable_row_security
Definition pg_backup.h:159
int noDataForFailedTables
Definition pg_backup.h:148
struct _tocEntry * pending_next
struct _tocEntry * prev
teSection section
struct _tocEntry * pending_prev
DefnDumperPtr defnDumper
DataDumperPtr dataDumper
CatalogId catalogId
struct _tocEntry * next
const void * dataDumperArg
const void * defnDumperArg
DumpId * dependencies
pg_compress_algorithm algorithm
Definition compression.h:34
int tm_sec
Definition pgtime.h:36
unsigned short st_mode
Definition win32_port.h:258
static void * fn(void *arg)
@ TRI_DEFAULT
Definition vacuumlo.c:36
const char * type
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define ftello(stream)
Definition win32_port.h:209
#define S_ISREG(m)
Definition win32_port.h:318
#define fseeko(stream, offset, origin)
Definition win32_port.h:206
static void StartTransaction(void)
Definition xact.c:2078
static void CommitTransaction(void)
Definition xact.c:2242
ArchiveMode
Definition xlog.h:65