PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_backup_archiver.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_backup_archiver.c
4 *
5 * Private implementation of the archiver routines.
6 *
7 * See the headers to pg_restore for more details.
8 *
9 * Copyright (c) 2000, Philip Warner
10 * Rights are granted to use this software in any way so long
11 * as this notice is not removed.
12 *
13 * The author is not responsible for loss or damages that may
14 * result from its use.
15 *
16 *
17 * IDENTIFICATION
18 * src/bin/pg_dump/pg_backup_archiver.c
19 *
20 *-------------------------------------------------------------------------
21 */
22#include "postgres_fe.h"
23
24#include <ctype.h>
25#include <fcntl.h>
26#include <unistd.h>
27#include <sys/stat.h>
28#include <sys/wait.h>
29#ifdef WIN32
30#include <io.h>
31#endif
32
33#include "catalog/pg_class_d.h"
34#include "catalog/pg_largeobject_metadata_d.h"
35#include "catalog/pg_shdepend_d.h"
36#include "common/string.h"
37#include "compress_io.h"
38#include "dumputils.h"
40#include "lib/binaryheap.h"
41#include "lib/stringinfo.h"
42#include "libpq/libpq-fs.h"
43#include "parallel.h"
44#include "pg_backup_archiver.h"
45#include "pg_backup_db.h"
46#include "pg_backup_utils.h"
47#include "pgtar.h"
48
49#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
50#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
51
52#define TOC_PREFIX_NONE ""
53#define TOC_PREFIX_DATA "Data for "
54#define TOC_PREFIX_STATS "Statistics for "
55
56static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
57 const pg_compress_specification compression_spec,
61static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
62static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
64static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
65static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
66static void _becomeUser(ArchiveHandle *AH, const char *user);
67static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
68static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
69static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
70static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
72 TocEntry *te);
73static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
78static bool _tocEntryIsACL(TocEntry *te);
82static void buildTocEntryArrays(ArchiveHandle *AH);
83static void _moveBefore(TocEntry *pos, TocEntry *te);
85
86static int RestoringToDB(ArchiveHandle *AH);
87static void dump_lo_buf(ArchiveHandle *AH);
88static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
89static void SetOutput(ArchiveHandle *AH, const char *filename,
90 const pg_compress_specification compression_spec, bool append_data);
93
94static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
98 ParallelState *pstate,
102static void pending_list_header_init(TocEntry *l);
103static void pending_list_append(TocEntry *l, TocEntry *te);
104static void pending_list_remove(TocEntry *te);
105static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
106static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
111 ParallelState *pstate);
112static void mark_dump_job_done(ArchiveHandle *AH,
113 TocEntry *te,
114 int status,
115 void *callback_data);
117 TocEntry *te,
118 int status,
119 void *callback_data);
120static void fix_dependencies(ArchiveHandle *AH);
124static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
126static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
128
129static void StrictNamesCheck(RestoreOptions *ropt);
130
131
132/*
133 * Allocate a new DumpOptions block containing all default values.
134 */
143
144/*
145 * Initialize a DumpOptions struct to all default values
146 */
147void
149{
150 memset(opts, 0, sizeof(DumpOptions));
151 /* set any fields that shouldn't default to zeroes */
152 opts->include_everything = true;
153 opts->cparams.promptPassword = TRI_DEFAULT;
154 opts->dumpSections = DUMP_UNSECTIONED;
155 opts->dumpSchema = true;
156 opts->dumpData = true;
157 opts->dumpStatistics = false;
158}
159
160/*
161 * Create a freshly allocated DumpOptions with options equivalent to those
162 * found in the given RestoreOptions.
163 */
166{
167 DumpOptions *dopt = NewDumpOptions();
168
169 /* this is the inverse of what's at the end of pg_dump.c's main() */
170 dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
171 dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
172 dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
175 dopt->outputClean = ropt->dropSchema;
176 dopt->dumpData = ropt->dumpData;
177 dopt->dumpSchema = ropt->dumpSchema;
178 dopt->dumpSections = ropt->dumpSections;
179 dopt->dumpStatistics = ropt->dumpStatistics;
180 dopt->if_exists = ropt->if_exists;
181 dopt->column_inserts = ropt->column_inserts;
182 dopt->aclsSkip = ropt->aclsSkip;
183 dopt->outputSuperuser = ropt->superuser;
184 dopt->outputCreateDB = ropt->createDB;
185 dopt->outputNoOwner = ropt->noOwner;
186 dopt->outputNoTableAm = ropt->noTableAm;
187 dopt->outputNoTablespaces = ropt->noTablespace;
189 dopt->use_setsessauth = ropt->use_setsessauth;
191 dopt->dump_inserts = ropt->dump_inserts;
192 dopt->no_comments = ropt->no_comments;
193 dopt->no_policies = ropt->no_policies;
194 dopt->no_publications = ropt->no_publications;
197 dopt->lockWaitTimeout = ropt->lockWaitTimeout;
200 dopt->sequence_data = ropt->sequence_data;
201 dopt->restrict_key = ropt->restrict_key ? pg_strdup(ropt->restrict_key) : NULL;
202
203 return dopt;
204}
205
206
207/*
208 * Wrapper functions.
209 *
210 * The objective is to make writing new formats and dumpers as simple
211 * as possible, if necessary at the expense of extra function calls etc.
212 *
213 */
214
215/*
216 * The dump worker setup needs lots of knowledge of the internals of pg_dump,
217 * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
218 * setup doesn't need to know anything much, so it's defined here.
219 */
220static void
222{
223 ArchiveHandle *AH = (ArchiveHandle *) AHX;
224
225 AH->ReopenPtr(AH);
226}
227
228
229/* Create a new archive */
230/* Public */
231Archive *
233 const pg_compress_specification compression_spec,
234 bool dosync, ArchiveMode mode,
237
238{
239 ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
241
242 return (Archive *) AH;
243}
244
245/* Open an existing archive */
246/* Public */
247Archive *
249{
250 ArchiveHandle *AH;
251 pg_compress_specification compression_spec = {0};
252
253 compression_spec.algorithm = PG_COMPRESSION_NONE;
254 AH = _allocAH(FileSpec, fmt, compression_spec, true,
257
258 return (Archive *) AH;
259}
260
261/* Public */
262void
264{
265 ArchiveHandle *AH = (ArchiveHandle *) AHX;
266
267 AH->ClosePtr(AH);
268
269 /* Close the output */
270 errno = 0;
271 if (!EndCompressFileHandle(AH->OF))
272 pg_fatal("could not close output file: %m");
273}
274
275/* Public */
276void
278{
279 /* Caller can omit dump options, in which case we synthesize them */
280 if (dopt == NULL && ropt != NULL)
282
283 /* Save options for later access */
284 AH->dopt = dopt;
285 AH->ropt = ropt;
286}
287
288/* Public */
289void
291{
292 ArchiveHandle *AH = (ArchiveHandle *) AHX;
293 RestoreOptions *ropt = AH->public.ropt;
294 TocEntry *te;
296
297 /* Decide which TOC entries will be dumped/restored, and mark them */
299 for (te = AH->toc->next; te != AH->toc; te = te->next)
300 {
301 /*
302 * When writing an archive, we also take this opportunity to check
303 * that we have generated the entries in a sane order that respects
304 * the section divisions. When reading, don't complain, since buggy
305 * old versions of pg_dump might generate out-of-order archives.
306 */
307 if (AH->mode != archModeRead)
308 {
309 switch (te->section)
310 {
311 case SECTION_NONE:
312 /* ok to be anywhere */
313 break;
314 case SECTION_PRE_DATA:
316 pg_log_warning("archive items not in correct section order");
317 break;
318 case SECTION_DATA:
320 pg_log_warning("archive items not in correct section order");
321 break;
323 /* ok no matter which section we were in */
324 break;
325 default:
326 pg_fatal("unexpected section code %d",
327 (int) te->section);
328 break;
329 }
330 }
331
332 if (te->section != SECTION_NONE)
333 curSection = te->section;
334
335 te->reqs = _tocEntryRequired(te, curSection, AH);
336 }
337
338 /* Enforce strict names checking */
339 if (ropt->strict_names)
340 StrictNamesCheck(ropt);
341}
342
343/*
344 * RestoreArchive
345 *
346 * If append_data is set, then append data into file as we are restoring dump
347 * of multiple databases which was taken by pg_dumpall.
348 */
349void
351{
352 ArchiveHandle *AH = (ArchiveHandle *) AHX;
353 RestoreOptions *ropt = AH->public.ropt;
354 bool parallel_mode;
355 TocEntry *te;
357
359
360 /*
361 * If we're going to do parallel restore, there are some restrictions.
362 */
363 parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
364 if (parallel_mode)
365 {
366 /* We haven't got round to making this work for all archive formats */
367 if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
368 pg_fatal("parallel restore is not supported with this archive file format");
369
370 /* Doesn't work if the archive represents dependencies as OIDs */
371 if (AH->version < K_VERS_1_8)
372 pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
373
374 /*
375 * It's also not gonna work if we can't reopen the input file, so
376 * let's try that immediately.
377 */
378 AH->ReopenPtr(AH);
379 }
380
381 /*
382 * Make sure we won't need (de)compression we haven't got
383 */
384 if (AH->PrintTocDataPtr != NULL)
385 {
386 for (te = AH->toc->next; te != AH->toc; te = te->next)
387 {
388 if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
389 {
391
392 if (errmsg)
393 pg_fatal("cannot restore from compressed archive (%s)",
394 errmsg);
395 else
396 break;
397 }
398 }
399 }
400
401 /*
402 * Prepare index arrays, so we can assume we have them throughout restore.
403 * It's possible we already did this, though.
404 */
405 if (AH->tocsByDumpId == NULL)
407
408 /*
409 * If we're using a DB connection, then connect it.
410 */
411 if (ropt->useDB)
412 {
413 pg_log_info("connecting to database for restore");
414 if (AH->version < K_VERS_1_3)
415 pg_fatal("direct database connections are not supported in pre-1.3 archives");
416
417 /*
418 * We don't want to guess at whether the dump will successfully
419 * restore; allow the attempt regardless of the version of the restore
420 * target.
421 */
422 AHX->minRemoteVersion = 0;
423 AHX->maxRemoteVersion = 9999999;
424
425 ConnectDatabaseAhx(AHX, &ropt->cparams, false);
426
427 /*
428 * If we're talking to the DB directly, don't send comments since they
429 * obscure SQL when displaying errors
430 */
431 AH->noTocComments = 1;
432 }
433
434 /*
435 * Work out if we have an implied schema-less restore. This can happen if
436 * the dump excluded the schema or the user has used a toc list to exclude
437 * all of the schema data. All we do is look for schema entries - if none
438 * are found then we unset the dumpSchema flag.
439 *
440 * We could scan for wanted TABLE entries, but that is not the same as
441 * data-only. At this stage, it seems unnecessary (6-Mar-2001).
442 */
443 if (ropt->dumpSchema)
444 {
445 bool no_schema_found = true;
446
447 for (te = AH->toc->next; te != AH->toc; te = te->next)
448 {
449 if ((te->reqs & REQ_SCHEMA) != 0)
450 {
451 no_schema_found = false;
452 break;
453 }
454 }
455 if (no_schema_found)
456 {
457 ropt->dumpSchema = false;
458 pg_log_info("implied no-schema restore");
459 }
460 }
461
462 /*
463 * Setup the output file if necessary.
464 */
465 sav = SaveOutput(AH);
468
469 ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
470
471 /*
472 * If generating plain-text output, enter restricted mode to block any
473 * unexpected psql meta-commands. A malicious source might try to inject
474 * a variety of things via bogus responses to queries. While we cannot
475 * prevent such sources from affecting the destination at restore time, we
476 * can block psql meta-commands so that the client machine that runs psql
477 * with the dump output remains unaffected.
478 */
479 if (ropt->restrict_key)
480 ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
481
482 if (AH->archiveRemoteVersion)
483 ahprintf(AH, "-- Dumped from database version %s\n",
485 if (AH->archiveDumpVersion)
486 ahprintf(AH, "-- Dumped by pg_dump version %s\n",
488
489 ahprintf(AH, "\n");
490
491 if (AH->public.verbose)
492 dumpTimestamp(AH, "Started on", AH->createDate);
493
494 if (ropt->single_txn)
495 {
496 if (AH->connection)
497 StartTransaction(AHX);
498 else
499 ahprintf(AH, "BEGIN;\n\n");
500 }
501
502 /*
503 * Establish important parameter values right away.
504 */
506
508
509 /*
510 * Drop the items at the start, in reverse order
511 */
512 if (ropt->dropSchema)
513 {
514 for (te = AH->toc->prev; te != AH->toc; te = te->prev)
515 {
516 AH->currentTE = te;
517
518 /*
519 * In createDB mode, issue a DROP *only* for the database as a
520 * whole. Issuing drops against anything else would be wrong,
521 * because at this point we're connected to the wrong database.
522 * (The DATABASE PROPERTIES entry, if any, should be treated like
523 * the DATABASE entry.)
524 */
525 if (ropt->createDB)
526 {
527 if (strcmp(te->desc, "DATABASE") != 0 &&
528 strcmp(te->desc, "DATABASE PROPERTIES") != 0)
529 continue;
530 }
531
532 /* Otherwise, drop anything that's selected and has a dropStmt */
533 if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
534 {
535 bool not_allowed_in_txn = false;
536
537 pg_log_info("dropping %s %s", te->desc, te->tag);
538
539 /*
540 * In --transaction-size mode, we have to temporarily exit our
541 * transaction block to drop objects that can't be dropped
542 * within a transaction.
543 */
544 if (ropt->txn_size > 0)
545 {
546 if (strcmp(te->desc, "DATABASE") == 0 ||
547 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
548 {
549 not_allowed_in_txn = true;
550 if (AH->connection)
552 else
553 ahprintf(AH, "COMMIT;\n");
554 }
555 }
556
557 /* Select owner and schema as necessary */
558 _becomeOwner(AH, te);
559 _selectOutputSchema(AH, te->namespace);
560
561 /*
562 * Now emit the DROP command, if the object has one. Note we
563 * don't necessarily emit it verbatim; at this point we add an
564 * appropriate IF EXISTS clause, if the user requested it.
565 */
566 if (strcmp(te->desc, "BLOB METADATA") == 0)
567 {
568 /* We must generate the per-blob commands */
569 if (ropt->if_exists)
570 IssueCommandPerBlob(AH, te,
571 "SELECT pg_catalog.lo_unlink(oid) "
572 "FROM pg_catalog.pg_largeobject_metadata "
573 "WHERE oid = '", "'");
574 else
575 IssueCommandPerBlob(AH, te,
576 "SELECT pg_catalog.lo_unlink('",
577 "')");
578 }
579 else if (*te->dropStmt != '\0')
580 {
581 if (!ropt->if_exists ||
582 strncmp(te->dropStmt, "--", 2) == 0)
583 {
584 /*
585 * Without --if-exists, or if it's just a comment (as
586 * happens for the public schema), print the dropStmt
587 * as-is.
588 */
589 ahprintf(AH, "%s", te->dropStmt);
590 }
591 else
592 {
593 /*
594 * Inject an appropriate spelling of "if exists". For
595 * old-style large objects, we have a routine that
596 * knows how to do it, without depending on
597 * te->dropStmt; use that. For other objects we need
598 * to parse the command.
599 */
600 if (strcmp(te->desc, "BLOB") == 0)
601 {
603 }
604 else
605 {
606 char *dropStmt = pg_strdup(te->dropStmt);
607 char *dropStmtOrig = dropStmt;
609
610 /*
611 * Need to inject IF EXISTS clause after ALTER
612 * TABLE part in ALTER TABLE .. DROP statement
613 */
614 if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
615 {
617 "ALTER TABLE IF EXISTS");
618 dropStmt = dropStmt + 11;
619 }
620
621 /*
622 * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
623 * not support the IF EXISTS clause, and therefore
624 * we simply emit the original command for DEFAULT
625 * objects (modulo the adjustment made above).
626 *
627 * Likewise, don't mess with DATABASE PROPERTIES.
628 *
629 * If we used CREATE OR REPLACE VIEW as a means of
630 * quasi-dropping an ON SELECT rule, that should
631 * be emitted unchanged as well.
632 *
633 * For other object types, we need to extract the
634 * first part of the DROP which includes the
635 * object type. Most of the time this matches
636 * te->desc, so search for that; however for the
637 * different kinds of CONSTRAINTs, we know to
638 * search for hardcoded "DROP CONSTRAINT" instead.
639 */
640 if (strcmp(te->desc, "DEFAULT") == 0 ||
641 strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
642 strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
643 appendPQExpBufferStr(ftStmt, dropStmt);
644 else
645 {
646 char buffer[40];
647 char *mark;
648
649 if (strcmp(te->desc, "CONSTRAINT") == 0 ||
650 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
651 strcmp(te->desc, "FK CONSTRAINT") == 0)
652 strcpy(buffer, "DROP CONSTRAINT");
653 else
654 snprintf(buffer, sizeof(buffer), "DROP %s",
655 te->desc);
656
657 mark = strstr(dropStmt, buffer);
658
659 if (mark)
660 {
661 *mark = '\0';
662 appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
663 dropStmt, buffer,
664 mark + strlen(buffer));
665 }
666 else
667 {
668 /* complain and emit unmodified command */
669 pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
671 appendPQExpBufferStr(ftStmt, dropStmt);
672 }
673 }
674
675 ahprintf(AH, "%s", ftStmt->data);
676
679 }
680 }
681 }
682
683 /*
684 * In --transaction-size mode, re-establish the transaction
685 * block if needed; otherwise, commit after every N drops.
686 */
687 if (ropt->txn_size > 0)
688 {
690 {
691 if (AH->connection)
692 StartTransaction(AHX);
693 else
694 ahprintf(AH, "BEGIN;\n");
695 AH->txnCount = 0;
696 }
697 else if (++AH->txnCount >= ropt->txn_size)
698 {
699 if (AH->connection)
700 {
702 StartTransaction(AHX);
703 }
704 else
705 ahprintf(AH, "COMMIT;\nBEGIN;\n");
706 AH->txnCount = 0;
707 }
708 }
709 }
710 }
711
712 /*
713 * _selectOutputSchema may have set currSchema to reflect the effect
714 * of a "SET search_path" command it emitted. However, by now we may
715 * have dropped that schema; or it might not have existed in the first
716 * place. In either case the effective value of search_path will not
717 * be what we think. Forcibly reset currSchema so that we will
718 * re-establish the search_path setting when needed (after creating
719 * the schema).
720 *
721 * If we treated users as pg_dump'able objects then we'd need to reset
722 * currUser here too.
723 */
724 free(AH->currSchema);
725 AH->currSchema = NULL;
726 }
727
728 if (parallel_mode)
729 {
730 /*
731 * In parallel mode, turn control over to the parallel-restore logic.
732 */
733 ParallelState *pstate;
735
736 /* The archive format module may need some setup for this */
739
741
742 /* This runs PRE_DATA items and then disconnects from the database */
744 Assert(AH->connection == NULL);
745
746 /* ParallelBackupStart() will actually fork the processes */
747 pstate = ParallelBackupStart(AH);
749 ParallelBackupEnd(AH, pstate);
750
751 /* reconnect the leader and see if we missed something */
753 Assert(AH->connection != NULL);
754 }
755 else
756 {
757 /*
758 * In serial mode, process everything in three phases: normal items,
759 * then ACLs, then post-ACL items. We might be able to skip one or
760 * both extra phases in some cases, eg data-only restores.
761 */
762 bool haveACL = false;
763 bool havePostACL = false;
764
765 for (te = AH->toc->next; te != AH->toc; te = te->next)
766 {
767 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
768 continue; /* ignore if not to be dumped at all */
769
770 /* Skip if no-tablespace is given. */
771 if (ropt->noTablespace && te && te->desc &&
772 (strcmp(te->desc, "TABLESPACE") == 0))
773 continue;
774
775 /*
776 * Skip DROP DATABASE/ROLES/TABLESPACE if we didn't specify
777 * --clean
778 */
779 if (!ropt->dropSchema && te && te->desc &&
780 strcmp(te->desc, "DROP_GLOBAL") == 0)
781 continue;
782
783 switch (_tocEntryRestorePass(te))
784 {
786 (void) restore_toc_entry(AH, te, false);
787 break;
788 case RESTORE_PASS_ACL:
789 haveACL = true;
790 break;
792 havePostACL = true;
793 break;
794 }
795 }
796
797 if (haveACL)
798 {
799 for (te = AH->toc->next; te != AH->toc; te = te->next)
800 {
801 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
803 (void) restore_toc_entry(AH, te, false);
804 }
805 }
806
807 if (havePostACL)
808 {
809 for (te = AH->toc->next; te != AH->toc; te = te->next)
810 {
811 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
813 (void) restore_toc_entry(AH, te, false);
814 }
815 }
816 }
817
818 /*
819 * Close out any persistent transaction we may have. While these two
820 * cases are started in different places, we can end both cases here.
821 */
822 if (ropt->single_txn || ropt->txn_size > 0)
823 {
824 if (AH->connection)
826 else
827 ahprintf(AH, "COMMIT;\n\n");
828 }
829
830 if (AH->public.verbose)
831 dumpTimestamp(AH, "Completed on", time(NULL));
832
833 ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
834
835 /*
836 * If generating plain-text output, exit restricted mode at the very end
837 * of the script. This is not pro forma; in particular, pg_dumpall
838 * requires this when transitioning from one database to another.
839 */
840 if (ropt->restrict_key)
841 ahprintf(AH, "\\unrestrict %s\n\n", ropt->restrict_key);
842
843 /*
844 * Clean up & we're done.
845 */
847
849 RestoreOutput(AH, sav);
850
851 if (ropt->useDB)
853}
854
855/*
856 * Restore a single TOC item. Used in both parallel and non-parallel restore;
857 * is_parallel is true if we are in a worker child process.
858 *
859 * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
860 * the parallel parent has to make the corresponding status update.
861 */
862static int
864{
865 RestoreOptions *ropt = AH->public.ropt;
866 int status = WORKER_OK;
867 int reqs;
868 bool defnDumped;
869
870 AH->currentTE = te;
871
872 /* Dump any relevant dump warnings to stderr */
873 if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
874 {
875 if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
876 pg_log_warning("warning from original dump file: %s", te->defn);
877 else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
878 pg_log_warning("warning from original dump file: %s", te->copyStmt);
879 }
880
881 /* Work out what, if anything, we want from this entry */
882 reqs = te->reqs;
883
884 defnDumped = false;
885
886 /*
887 * If it has a schema component that we want, then process that
888 */
889 if ((reqs & REQ_SCHEMA) != 0)
890 {
891 bool object_is_db = false;
892
893 /*
894 * In --transaction-size mode, must exit our transaction block to
895 * create a database or set its properties.
896 */
897 if (strcmp(te->desc, "DATABASE") == 0 ||
898 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
899 {
900 object_is_db = true;
901 if (ropt->txn_size > 0)
902 {
903 if (AH->connection)
905 else
906 ahprintf(AH, "COMMIT;\n\n");
907 }
908 }
909
910 /* Show namespace in log message if available */
911 if (te->namespace)
912 pg_log_info("creating %s \"%s.%s\"",
913 te->desc, te->namespace, te->tag);
914 else
915 pg_log_info("creating %s \"%s\"",
916 te->desc, te->tag);
917
919 defnDumped = true;
920
921 if (strcmp(te->desc, "TABLE") == 0)
922 {
923 if (AH->lastErrorTE == te)
924 {
925 /*
926 * We failed to create the table. If
927 * --no-data-for-failed-tables was given, mark the
928 * corresponding TABLE DATA to be ignored.
929 *
930 * In the parallel case this must be done in the parent, so we
931 * just set the return value.
932 */
933 if (ropt->noDataForFailedTables)
934 {
935 if (is_parallel)
936 status = WORKER_INHIBIT_DATA;
937 else
939 }
940 }
941 else
942 {
943 /*
944 * We created the table successfully. Mark the corresponding
945 * TABLE DATA for possible truncation.
946 *
947 * In the parallel case this must be done in the parent, so we
948 * just set the return value.
949 */
950 if (is_parallel)
951 status = WORKER_CREATE_DONE;
952 else
953 mark_create_done(AH, te);
954 }
955 }
956
957 /*
958 * If we created a DB, connect to it. Also, if we changed DB
959 * properties, reconnect to ensure that relevant GUC settings are
960 * applied to our session. (That also restarts the transaction block
961 * in --transaction-size mode.)
962 */
963 if (object_is_db)
964 {
965 pg_log_info("connecting to new database \"%s\"", te->tag);
966 _reconnectToDB(AH, te->tag);
967 }
968 }
969
970 /*
971 * If it has a data component that we want, then process that
972 */
973 if ((reqs & REQ_DATA) != 0)
974 {
975 /*
976 * hadDumper will be set if there is genuine data component for this
977 * node. Otherwise, we need to check the defn field for statements
978 * that need to be executed in data-only restores.
979 */
980 if (te->hadDumper)
981 {
982 /*
983 * If we can output the data, then restore it.
984 */
985 if (AH->PrintTocDataPtr != NULL)
986 {
988
989 if (strcmp(te->desc, "BLOBS") == 0 ||
990 strcmp(te->desc, "BLOB COMMENTS") == 0)
991 {
992 pg_log_info("processing %s", te->desc);
993
994 _selectOutputSchema(AH, "pg_catalog");
995
996 /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
997 if (strcmp(te->desc, "BLOB COMMENTS") == 0)
999
1000 AH->PrintTocDataPtr(AH, te);
1001
1003 }
1004 else
1005 {
1006 bool use_truncate;
1007
1009
1010 /* Select owner and schema as necessary */
1011 _becomeOwner(AH, te);
1012 _selectOutputSchema(AH, te->namespace);
1013
1014 pg_log_info("processing data for table \"%s.%s\"",
1015 te->namespace, te->tag);
1016
1017 /*
1018 * In parallel restore, if we created the table earlier in
1019 * this run (so that we know it is empty) and we are not
1020 * restoring a load-via-partition-root data item then we
1021 * wrap the COPY in a transaction and precede it with a
1022 * TRUNCATE. If wal_level is set to minimal this prevents
1023 * WAL-logging the COPY. This obtains a speedup similar
1024 * to that from using single_txn mode in non-parallel
1025 * restores.
1026 *
1027 * We mustn't do this for load-via-partition-root cases
1028 * because some data might get moved across partition
1029 * boundaries, risking deadlock and/or loss of previously
1030 * loaded data. (We assume that all partitions of a
1031 * partitioned table will be treated the same way.)
1032 */
1033 use_truncate = is_parallel && te->created &&
1035
1036 if (use_truncate)
1037 {
1038 /*
1039 * Parallel restore is always talking directly to a
1040 * server, so no need to see if we should issue BEGIN.
1041 */
1043
1044 /*
1045 * Issue TRUNCATE with ONLY so that child tables are
1046 * not wiped.
1047 */
1048 ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1049 fmtQualifiedId(te->namespace, te->tag));
1050 }
1051
1052 /*
1053 * If we have a copy statement, use it.
1054 */
1055 if (te->copyStmt && strlen(te->copyStmt) > 0)
1056 {
1057 ahprintf(AH, "%s", te->copyStmt);
1059 }
1060 else
1062
1063 AH->PrintTocDataPtr(AH, te);
1064
1065 /*
1066 * Terminate COPY if needed.
1067 */
1068 if (AH->outputKind == OUTPUT_COPYDATA &&
1069 RestoringToDB(AH))
1070 EndDBCopyMode(&AH->public, te->tag);
1072
1073 /* close out the transaction started above */
1074 if (use_truncate)
1076
1078 }
1079 }
1080 }
1081 else if (!defnDumped)
1082 {
1083 /* If we haven't already dumped the defn part, do so now */
1084 pg_log_info("executing %s %s", te->desc, te->tag);
1086 }
1087 }
1088
1089 /*
1090 * If it has a statistics component that we want, then process that
1091 */
1092 if ((reqs & REQ_STATS) != 0)
1094
1095 /*
1096 * If we emitted anything for this TOC entry, that counts as one action
1097 * against the transaction-size limit. Commit if it's time to.
1098 */
1099 if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1100 {
1101 if (++AH->txnCount >= ropt->txn_size)
1102 {
1103 if (AH->connection)
1104 {
1107 }
1108 else
1109 ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1110 AH->txnCount = 0;
1111 }
1112 }
1113
1114 if (AH->public.n_errors > 0 && status == WORKER_OK)
1115 status = WORKER_IGNORED_ERRORS;
1116
1117 return status;
1118}
1119
1120/*
1121 * Allocate a new RestoreOptions block.
1122 * This is mainly so we can initialize it, but also for future expansion,
1123 */
1126{
1128
1130
1131 /* set any fields that shouldn't default to zeroes */
1132 opts->format = archUnknown;
1133 opts->cparams.promptPassword = TRI_DEFAULT;
1134 opts->dumpSections = DUMP_UNSECTIONED;
1135 opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1136 opts->compression_spec.level = 0;
1137 opts->dumpSchema = true;
1138 opts->dumpData = true;
1139 opts->dumpStatistics = true;
1140
1141 return opts;
1142}
1143
1144static void
1146{
1147 RestoreOptions *ropt = AH->public.ropt;
1148
1149 /* This hack is only needed in a data-only restore */
1150 if (ropt->dumpSchema || !ropt->disable_triggers)
1151 return;
1152
1153 pg_log_info("disabling triggers for %s", te->tag);
1154
1155 /*
1156 * Become superuser if possible, since they are the only ones who can
1157 * disable constraint triggers. If -S was not given, assume the initial
1158 * user identity is a superuser. (XXX would it be better to become the
1159 * table owner?)
1160 */
1161 _becomeUser(AH, ropt->superuser);
1162
1163 /*
1164 * Disable them.
1165 */
1166 ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1167 fmtQualifiedId(te->namespace, te->tag));
1168}
1169
1170static void
1172{
1173 RestoreOptions *ropt = AH->public.ropt;
1174
1175 /* This hack is only needed in a data-only restore */
1176 if (ropt->dumpSchema || !ropt->disable_triggers)
1177 return;
1178
1179 pg_log_info("enabling triggers for %s", te->tag);
1180
1181 /*
1182 * Become superuser if possible, since they are the only ones who can
1183 * disable constraint triggers. If -S was not given, assume the initial
1184 * user identity is a superuser. (XXX would it be better to become the
1185 * table owner?)
1186 */
1187 _becomeUser(AH, ropt->superuser);
1188
1189 /*
1190 * Enable them.
1191 */
1192 ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1193 fmtQualifiedId(te->namespace, te->tag));
1194}
1195
1196/*
1197 * Detect whether a TABLE DATA TOC item is performing "load via partition
1198 * root", that is the target table is an ancestor partition rather than the
1199 * table the TOC item is nominally for.
1200 *
1201 * In newer archive files this can be detected by checking for a special
1202 * comment placed in te->defn. In older files we have to fall back to seeing
1203 * if the COPY statement targets the named table or some other one. This
1204 * will not work for data dumped as INSERT commands, so we could give a false
1205 * negative in that case; fortunately, that's a rarely-used option.
1206 */
1207static bool
1209{
1210 if (te->defn &&
1211 strncmp(te->defn, "-- load via partition root ", 27) == 0)
1212 return true;
1213 if (te->copyStmt && *te->copyStmt)
1214 {
1215 PQExpBuffer copyStmt = createPQExpBuffer();
1216 bool result;
1217
1218 /*
1219 * Build the initial part of the COPY as it would appear if the
1220 * nominal target table is the actual target. If we see anything
1221 * else, it must be a load-via-partition-root case.
1222 */
1223 appendPQExpBuffer(copyStmt, "COPY %s ",
1224 fmtQualifiedId(te->namespace, te->tag));
1225 result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1226 destroyPQExpBuffer(copyStmt);
1227 return result;
1228 }
1229 /* Assume it's not load-via-partition-root */
1230 return false;
1231}
1232
1233/*
1234 * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1235 */
1236
1237/* Public */
1238void
1239WriteData(Archive *AHX, const void *data, size_t dLen)
1240{
1241 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1242
1243 if (!AH->currToc)
1244 pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1245
1246 AH->WriteDataPtr(AH, data, dLen);
1247}
1248
1249/*
1250 * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1251 * repository for all metadata. But the name has stuck.
1252 *
1253 * The new entry is added to the Archive's TOC list. Most callers can ignore
1254 * the result value because nothing else need be done, but a few want to
1255 * manipulate the TOC entry further.
1256 */
1257
1258/* Public */
1259TocEntry *
1260ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1262{
1263 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1265
1267
1268 AH->tocCount++;
1269 if (dumpId > AH->maxDumpId)
1270 AH->maxDumpId = dumpId;
1271
1272 newToc->prev = AH->toc->prev;
1273 newToc->next = AH->toc;
1274 AH->toc->prev->next = newToc;
1275 AH->toc->prev = newToc;
1276
1277 newToc->catalogId = catalogId;
1278 newToc->dumpId = dumpId;
1279 newToc->section = opts->section;
1280
1281 newToc->tag = pg_strdup(opts->tag);
1282 newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1283 newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1284 newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1285 newToc->relkind = opts->relkind;
1286 newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1287 newToc->desc = pg_strdup(opts->description);
1288 newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1289 newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1290 newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1291
1292 if (opts->nDeps > 0)
1293 {
1294 newToc->dependencies = pg_malloc_array(DumpId, opts->nDeps);
1295 memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1296 newToc->nDeps = opts->nDeps;
1297 }
1298 else
1299 {
1300 newToc->dependencies = NULL;
1301 newToc->nDeps = 0;
1302 }
1303
1304 newToc->dataDumper = opts->dumpFn;
1305 newToc->dataDumperArg = opts->dumpArg;
1306 newToc->hadDumper = opts->dumpFn ? true : false;
1307
1308 newToc->defnDumper = opts->defnFn;
1309 newToc->defnDumperArg = opts->defnArg;
1310
1311 newToc->formatData = NULL;
1312 newToc->dataLength = 0;
1313
1314 if (AH->ArchiveEntryPtr != NULL)
1315 AH->ArchiveEntryPtr(AH, newToc);
1316
1317 return newToc;
1318}
1319
1320/* Public */
1321void
1323{
1324 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1325 RestoreOptions *ropt = AH->public.ropt;
1326 TocEntry *te;
1330 const char *fmtName;
1331 char stamp_str[64];
1332
1333 /* TOC is always uncompressed */
1335
1336 sav = SaveOutput(AH);
1337 if (ropt->filename)
1338 SetOutput(AH, ropt->filename, out_compression_spec, false);
1339
1341 localtime(&AH->createDate)) == 0)
1342 strcpy(stamp_str, "[unknown]");
1343
1344 ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1345 ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1346 sanitize_line(AH->archdbname, false),
1347 AH->tocCount,
1349
1350 switch (AH->format)
1351 {
1352 case archCustom:
1353 fmtName = "CUSTOM";
1354 break;
1355 case archDirectory:
1356 fmtName = "DIRECTORY";
1357 break;
1358 case archTar:
1359 fmtName = "TAR";
1360 break;
1361 default:
1362 fmtName = "UNKNOWN";
1363 }
1364
1365 ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1367 ahprintf(AH, "; Format: %s\n", fmtName);
1368 ahprintf(AH, "; Integer: %zu bytes\n", AH->intSize);
1369 ahprintf(AH, "; Offset: %zu bytes\n", AH->offSize);
1370 if (AH->archiveRemoteVersion)
1371 ahprintf(AH, "; Dumped from database version: %s\n",
1373 if (AH->archiveDumpVersion)
1374 ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1375 AH->archiveDumpVersion);
1376
1377 ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1378
1380 for (te = AH->toc->next; te != AH->toc; te = te->next)
1381 {
1382 /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1383 if (te->section != SECTION_NONE)
1384 curSection = te->section;
1385 te->reqs = _tocEntryRequired(te, curSection, AH);
1386 /* Now, should we print it? */
1387 if (ropt->verbose ||
1388 (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1389 {
1390 char *sanitized_name;
1391 char *sanitized_schema;
1392 char *sanitized_owner;
1393
1394 /*
1395 */
1396 sanitized_name = sanitize_line(te->tag, false);
1397 sanitized_schema = sanitize_line(te->namespace, true);
1398 sanitized_owner = sanitize_line(te->owner, false);
1399
1400 ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1404
1408 }
1409 if (ropt->verbose && te->nDeps > 0)
1410 {
1411 int i;
1412
1413 ahprintf(AH, ";\tdepends on:");
1414 for (i = 0; i < te->nDeps; i++)
1415 ahprintf(AH, " %d", te->dependencies[i]);
1416 ahprintf(AH, "\n");
1417 }
1418 }
1419
1420 /* Enforce strict names checking */
1421 if (ropt->strict_names)
1422 StrictNamesCheck(ropt);
1423
1424 if (ropt->filename)
1425 RestoreOutput(AH, sav);
1426}
1427
1428/***********
1429 * Large Object Archival
1430 ***********/
1431
1432/* Called by a dumper to signal start of a LO */
1433int
1435{
1436 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1437
1438 if (!AH->StartLOPtr)
1439 pg_fatal("large-object output not supported in chosen format");
1440
1441 AH->StartLOPtr(AH, AH->currToc, oid);
1442
1443 return 1;
1444}
1445
1446/* Called by a dumper to signal end of a LO */
1447int
1449{
1450 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1451
1452 if (AH->EndLOPtr)
1453 AH->EndLOPtr(AH, AH->currToc, oid);
1454
1455 return 1;
1456}
1457
1458/**********
1459 * Large Object Restoration
1460 **********/
1461
1462/*
1463 * Called by a format handler before a group of LOs is restored
1464 */
1465void
1467{
1468 RestoreOptions *ropt = AH->public.ropt;
1469
1470 /*
1471 * LOs must be restored within a transaction block, since we need the LO
1472 * handle to stay open while we write it. Establish a transaction unless
1473 * there's one being used globally.
1474 */
1475 if (!(ropt->single_txn || ropt->txn_size > 0))
1476 {
1477 if (AH->connection)
1479 else
1480 ahprintf(AH, "BEGIN;\n\n");
1481 }
1482
1483 AH->loCount = 0;
1484}
1485
1486/*
1487 * Called by a format handler after a group of LOs is restored
1488 */
1489void
1491{
1492 RestoreOptions *ropt = AH->public.ropt;
1493
1494 if (!(ropt->single_txn || ropt->txn_size > 0))
1495 {
1496 if (AH->connection)
1498 else
1499 ahprintf(AH, "COMMIT;\n\n");
1500 }
1501
1502 pg_log_info(ngettext("restored %d large object",
1503 "restored %d large objects",
1504 AH->loCount),
1505 AH->loCount);
1506}
1507
1508
1509/*
1510 * Called by a format handler to initiate restoration of a LO
1511 */
1512void
1514{
1515 bool old_lo_style = (AH->version < K_VERS_1_12);
1516 Oid loOid;
1517
1518 AH->loCount++;
1519
1520 /* Initialize the LO Buffer */
1521 if (AH->lo_buf == NULL)
1522 {
1523 /* First time through (in this process) so allocate the buffer */
1524 AH->lo_buf_size = LOBBUFSIZE;
1526 }
1527 AH->lo_buf_used = 0;
1528
1529 pg_log_info("restoring large object with OID %u", oid);
1530
1531 /* With an old archive we must do drop and create logic here */
1532 if (old_lo_style && drop)
1533 DropLOIfExists(AH, oid);
1534
1535 if (AH->connection)
1536 {
1537 if (old_lo_style)
1538 {
1539 loOid = lo_create(AH->connection, oid);
1540 if (loOid == 0 || loOid != oid)
1541 pg_fatal("could not create large object %u: %s",
1542 oid, PQerrorMessage(AH->connection));
1543 }
1544 AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1545 if (AH->loFd == -1)
1546 pg_fatal("could not open large object %u: %s",
1547 oid, PQerrorMessage(AH->connection));
1548 }
1549 else
1550 {
1551 if (old_lo_style)
1552 ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1553 oid, INV_WRITE);
1554 else
1555 ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1556 oid, INV_WRITE);
1557 }
1558
1559 AH->writingLO = true;
1560}
1561
1562void
1564{
1565 if (AH->lo_buf_used > 0)
1566 {
1567 /* Write remaining bytes from the LO buffer */
1568 dump_lo_buf(AH);
1569 }
1570
1571 AH->writingLO = false;
1572
1573 if (AH->connection)
1574 {
1575 lo_close(AH->connection, AH->loFd);
1576 AH->loFd = -1;
1577 }
1578 else
1579 {
1580 ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1581 }
1582}
1583
1584/***********
1585 * Sorting and Reordering
1586 ***********/
1587
1588void
1590{
1591 ArchiveHandle *AH = (ArchiveHandle *) AHX;
1592 RestoreOptions *ropt = AH->public.ropt;
1593 FILE *fh;
1595
1596 /* Allocate space for the 'wanted' array, and init it */
1597 ropt->idWanted = pg_malloc0_array(bool, AH->maxDumpId);
1598
1599 /* Setup the file */
1600 fh = fopen(ropt->tocFile, PG_BINARY_R);
1601 if (!fh)
1602 pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1603
1605
1606 while (pg_get_line_buf(fh, &linebuf))
1607 {
1608 char *cmnt;
1609 char *endptr;
1610 DumpId id;
1611 TocEntry *te;
1612
1613 /* Truncate line at comment, if any */
1614 cmnt = strchr(linebuf.data, ';');
1615 if (cmnt != NULL)
1616 {
1617 cmnt[0] = '\0';
1618 linebuf.len = cmnt - linebuf.data;
1619 }
1620
1621 /* Ignore if all blank */
1622 if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1623 continue;
1624
1625 /* Get an ID, check it's valid and not already seen */
1626 id = strtol(linebuf.data, &endptr, 10);
1627 if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1628 ropt->idWanted[id - 1])
1629 {
1630 pg_log_warning("line ignored: %s", linebuf.data);
1631 continue;
1632 }
1633
1634 /* Find TOC entry */
1635 te = getTocEntryByDumpId(AH, id);
1636 if (!te)
1637 pg_fatal("could not find entry for ID %d",
1638 id);
1639
1640 /* Mark it wanted */
1641 ropt->idWanted[id - 1] = true;
1642
1643 /*
1644 * Move each item to the end of the list as it is selected, so that
1645 * they are placed in the desired order. Any unwanted items will end
1646 * up at the front of the list, which may seem unintuitive but it's
1647 * what we need. In an ordinary serial restore that makes no
1648 * difference, but in a parallel restore we need to mark unrestored
1649 * items' dependencies as satisfied before we start examining
1650 * restorable items. Otherwise they could have surprising
1651 * side-effects on the order in which restorable items actually get
1652 * restored.
1653 */
1654 _moveBefore(AH->toc, te);
1655 }
1656
1657 pg_free(linebuf.data);
1658
1659 if (fclose(fh) != 0)
1660 pg_fatal("could not close TOC file: %m");
1661}
1662
1663/**********************
1664 * Convenience functions that look like standard IO functions
1665 * for writing data when in dump mode.
1666 **********************/
1667
1668/* Public */
1669void
1670archputs(const char *s, Archive *AH)
1671{
1672 WriteData(AH, s, strlen(s));
1673}
1674
1675/* Public */
1676int
1677archprintf(Archive *AH, const char *fmt,...)
1678{
1679 int save_errno = errno;
1680 char *p;
1681 size_t len = 128; /* initial assumption about buffer size */
1682 size_t cnt;
1683
1684 for (;;)
1685 {
1686 va_list args;
1687
1688 /* Allocate work buffer. */
1689 p = (char *) pg_malloc(len);
1690
1691 /* Try to format the data. */
1692 errno = save_errno;
1693 va_start(args, fmt);
1694 cnt = pvsnprintf(p, len, fmt, args);
1695 va_end(args);
1696
1697 if (cnt < len)
1698 break; /* success */
1699
1700 /* Release buffer and loop around to try again with larger len. */
1701 free(p);
1702 len = cnt;
1703 }
1704
1705 WriteData(AH, p, cnt);
1706 free(p);
1707 return (int) cnt;
1708}
1709
1710
1711/*******************************
1712 * Stuff below here should be 'private' to the archiver routines
1713 *
1714 * If append_data is set, then append data into file as we are restoring dump
1715 * of multiple databases which was taken by pg_dumpall.
1716 *******************************/
1717
1718static void
1720 const pg_compress_specification compression_spec,
1721 bool append_data)
1722{
1724 const char *mode;
1725 int fn = -1;
1726
1727 if (filename)
1728 {
1729 if (strcmp(filename, "-") == 0)
1730 fn = fileno(stdout);
1731 }
1732 else if (AH->FH)
1733 fn = fileno(AH->FH);
1734 else if (AH->fSpec)
1735 {
1736 filename = AH->fSpec;
1737 }
1738 else
1739 fn = fileno(stdout);
1740
1741 if (append_data || AH->mode == archModeAppend)
1742 mode = PG_BINARY_A;
1743 else
1744 mode = PG_BINARY_W;
1745
1746 CFH = InitCompressFileHandle(compression_spec);
1747
1748 if (!CFH->open_func(filename, fn, mode, CFH))
1749 {
1750 if (filename)
1751 pg_fatal("could not open output file \"%s\": %m", filename);
1752 else
1753 pg_fatal("could not open output file: %m");
1754 }
1755
1756 AH->OF = CFH;
1757}
1758
1759static CompressFileHandle *
1761{
1762 return (CompressFileHandle *) AH->OF;
1763}
1764
1765static void
1767{
1768 errno = 0;
1769 if (!EndCompressFileHandle(AH->OF))
1770 pg_fatal("could not close output file: %m");
1771
1772 AH->OF = savedOutput;
1773}
1774
1775
1776
1777/*
1778 * Print formatted text to the output file (usually stdout).
1779 */
1780int
1781ahprintf(ArchiveHandle *AH, const char *fmt,...)
1782{
1783 int save_errno = errno;
1784 char *p;
1785 size_t len = 128; /* initial assumption about buffer size */
1786 size_t cnt;
1787
1788 for (;;)
1789 {
1790 va_list args;
1791
1792 /* Allocate work buffer. */
1793 p = (char *) pg_malloc(len);
1794
1795 /* Try to format the data. */
1796 errno = save_errno;
1797 va_start(args, fmt);
1798 cnt = pvsnprintf(p, len, fmt, args);
1799 va_end(args);
1800
1801 if (cnt < len)
1802 break; /* success */
1803
1804 /* Release buffer and loop around to try again with larger len. */
1805 free(p);
1806 len = cnt;
1807 }
1808
1809 ahwrite(p, 1, cnt, AH);
1810 free(p);
1811 return (int) cnt;
1812}
1813
1814/*
1815 * Single place for logic which says 'We are restoring to a direct DB connection'.
1816 */
1817static int
1819{
1820 RestoreOptions *ropt = AH->public.ropt;
1821
1822 return (ropt && ropt->useDB && AH->connection);
1823}
1824
1825/*
1826 * Dump the current contents of the LO data buffer while writing a LO
1827 */
1828static void
1830{
1831 if (AH->connection)
1832 {
1833 int res;
1834
1835 res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1836 pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1837 "wrote %zu bytes of large object data (result = %d)",
1838 AH->lo_buf_used),
1839 AH->lo_buf_used, res);
1840 /* We assume there are no short writes, only errors */
1841 if (res != AH->lo_buf_used)
1842 warn_or_exit_horribly(AH, "could not write to large object: %s",
1844 }
1845 else
1846 {
1848
1850 (const unsigned char *) AH->lo_buf,
1851 AH->lo_buf_used,
1852 AH);
1853
1854 /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1855 AH->writingLO = false;
1856 ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1857 AH->writingLO = true;
1858
1860 }
1861 AH->lo_buf_used = 0;
1862}
1863
1864
1865/*
1866 * Write buffer to the output file (usually stdout). This is used for
1867 * outputting 'restore' scripts etc. It is even possible for an archive
1868 * format to create a custom output routine to 'fake' a restore if it
1869 * wants to generate a script (see TAR output).
1870 */
1871void
1872ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1873{
1874 int bytes_written = 0;
1875
1876 if (AH->writingLO)
1877 {
1878 size_t remaining = size * nmemb;
1879
1880 while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1881 {
1882 size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1883
1884 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1885 ptr = (const char *) ptr + avail;
1886 remaining -= avail;
1887 AH->lo_buf_used += avail;
1888 dump_lo_buf(AH);
1889 }
1890
1891 memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1892 AH->lo_buf_used += remaining;
1893
1894 bytes_written = size * nmemb;
1895 }
1896 else if (AH->CustomOutPtr)
1897 bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1898
1899 /*
1900 * If we're doing a restore, and it's direct to DB, and we're connected
1901 * then send it to the DB.
1902 */
1903 else if (RestoringToDB(AH))
1904 bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1905 else
1906 {
1908
1909 CFH->write_func(ptr, size * nmemb, CFH);
1910 bytes_written = size * nmemb;
1911 }
1912
1913 if (bytes_written != size * nmemb)
1915}
1916
1917/* on some error, we may decide to go on... */
1918void
1920{
1921 va_list ap;
1922
1923 switch (AH->stage)
1924 {
1925
1926 case STAGE_NONE:
1927 /* Do nothing special */
1928 break;
1929
1930 case STAGE_INITIALIZING:
1931 if (AH->stage != AH->lastErrorStage)
1932 pg_log_info("while INITIALIZING:");
1933 break;
1934
1935 case STAGE_PROCESSING:
1936 if (AH->stage != AH->lastErrorStage)
1937 pg_log_info("while PROCESSING TOC:");
1938 break;
1939
1940 case STAGE_FINALIZING:
1941 if (AH->stage != AH->lastErrorStage)
1942 pg_log_info("while FINALIZING:");
1943 break;
1944 }
1945 if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1946 {
1947 pg_log_info("from TOC entry %d; %u %u %s %s %s",
1948 AH->currentTE->dumpId,
1950 AH->currentTE->catalogId.oid,
1951 AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1952 AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1953 AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1954 }
1955 AH->lastErrorStage = AH->stage;
1956 AH->lastErrorTE = AH->currentTE;
1957
1958 va_start(ap, fmt);
1960 va_end(ap);
1961
1962 if (AH->public.exit_on_error)
1963 exit_nicely(1);
1964 else
1965 AH->public.n_errors++;
1966}
1967
1968#ifdef NOT_USED
1969
1970static void
1972{
1973 /* Unlink te from list */
1974 te->prev->next = te->next;
1975 te->next->prev = te->prev;
1976
1977 /* and insert it after "pos" */
1978 te->prev = pos;
1979 te->next = pos->next;
1980 pos->next->prev = te;
1981 pos->next = te;
1982}
1983#endif
1984
1985static void
1987{
1988 /* Unlink te from list */
1989 te->prev->next = te->next;
1990 te->next->prev = te->prev;
1991
1992 /* and insert it before "pos" */
1993 te->prev = pos->prev;
1994 te->next = pos;
1995 pos->prev->next = te;
1996 pos->prev = te;
1997}
1998
1999/*
2000 * Build index arrays for the TOC list
2001 *
2002 * This should be invoked only after we have created or read in all the TOC
2003 * items.
2004 *
2005 * The arrays are indexed by dump ID (so entry zero is unused). Note that the
2006 * array entries run only up to maxDumpId. We might see dependency dump IDs
2007 * beyond that (if the dump was partial); so always check the array bound
2008 * before trying to touch an array entry.
2009 */
2010static void
2012{
2013 DumpId maxDumpId = AH->maxDumpId;
2014 TocEntry *te;
2015
2016 AH->tocsByDumpId = pg_malloc0_array(TocEntry *, (maxDumpId + 1));
2017 AH->tableDataId = pg_malloc0_array(DumpId, (maxDumpId + 1));
2018
2019 for (te = AH->toc->next; te != AH->toc; te = te->next)
2020 {
2021 /* this check is purely paranoia, maxDumpId should be correct */
2022 if (te->dumpId <= 0 || te->dumpId > maxDumpId)
2023 pg_fatal("bad dumpId");
2024
2025 /* tocsByDumpId indexes all TOCs by their dump ID */
2026 AH->tocsByDumpId[te->dumpId] = te;
2027
2028 /*
2029 * tableDataId provides the TABLE DATA item's dump ID for each TABLE
2030 * TOC entry that has a DATA item. We compute this by reversing the
2031 * TABLE DATA item's dependency, knowing that a TABLE DATA item has
2032 * just one dependency and it is the TABLE item.
2033 */
2034 if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
2035 {
2036 DumpId tableId = te->dependencies[0];
2037
2038 /*
2039 * The TABLE item might not have been in the archive, if this was
2040 * a data-only dump; but its dump ID should be less than its data
2041 * item's dump ID, so there should be a place for it in the array.
2042 */
2043 if (tableId <= 0 || tableId > maxDumpId)
2044 pg_fatal("bad table dumpId for TABLE DATA item");
2045
2046 AH->tableDataId[tableId] = te->dumpId;
2047 }
2048 }
2049}
2050
2051TocEntry *
2053{
2054 /* build index arrays if we didn't already */
2055 if (AH->tocsByDumpId == NULL)
2057
2058 if (id > 0 && id <= AH->maxDumpId)
2059 return AH->tocsByDumpId[id];
2060
2061 return NULL;
2062}
2063
2064int
2066{
2067 TocEntry *te = getTocEntryByDumpId(AH, id);
2068
2069 if (!te)
2070 return 0;
2071
2072 return te->reqs;
2073}
2074
2075size_t
2077{
2078 int off;
2079
2080 /* Save the flag */
2081 AH->WriteBytePtr(AH, wasSet);
2082
2083 /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2084 for (off = 0; off < sizeof(pgoff_t); off++)
2085 {
2086 AH->WriteBytePtr(AH, o & 0xFF);
2087 o >>= 8;
2088 }
2089 return sizeof(pgoff_t) + 1;
2090}
2091
2092int
2094{
2095 int i;
2096 int off;
2097 int offsetFlg;
2098
2099 /* Initialize to zero */
2100 *o = 0;
2101
2102 /* Check for old version */
2103 if (AH->version < K_VERS_1_7)
2104 {
2105 /* Prior versions wrote offsets using WriteInt */
2106 i = ReadInt(AH);
2107 /* -1 means not set */
2108 if (i < 0)
2109 return K_OFFSET_POS_NOT_SET;
2110 else if (i == 0)
2111 return K_OFFSET_NO_DATA;
2112
2113 /* Cast to pgoff_t because it was written as an int. */
2114 *o = (pgoff_t) i;
2115 return K_OFFSET_POS_SET;
2116 }
2117
2118 /*
2119 * Read the flag indicating the state of the data pointer. Check if valid
2120 * and die if not.
2121 *
2122 * This used to be handled by a negative or zero pointer, now we use an
2123 * extra byte specifically for the state.
2124 */
2125 offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2126
2127 switch (offsetFlg)
2128 {
2130 case K_OFFSET_NO_DATA:
2131 case K_OFFSET_POS_SET:
2132
2133 break;
2134
2135 default:
2136 pg_fatal("unexpected data offset flag %d", offsetFlg);
2137 }
2138
2139 /*
2140 * Read the bytes
2141 */
2142 for (off = 0; off < AH->offSize; off++)
2143 {
2144 if (off < sizeof(pgoff_t))
2145 *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2146 else
2147 {
2148 if (AH->ReadBytePtr(AH) != 0)
2149 pg_fatal("file offset in dump file is too large");
2150 }
2151 }
2152
2153 return offsetFlg;
2154}
2155
2156size_t
2158{
2159 int b;
2160
2161 /*
2162 * This is a bit yucky, but I don't want to make the binary format very
2163 * dependent on representation, and not knowing much about it, I write out
2164 * a sign byte. If you change this, don't forget to change the file
2165 * version #, and modify ReadInt to read the new format AS WELL AS the old
2166 * formats.
2167 */
2168
2169 /* SIGN byte */
2170 if (i < 0)
2171 {
2172 AH->WriteBytePtr(AH, 1);
2173 i = -i;
2174 }
2175 else
2176 AH->WriteBytePtr(AH, 0);
2177
2178 for (b = 0; b < AH->intSize; b++)
2179 {
2180 AH->WriteBytePtr(AH, i & 0xFF);
2181 i >>= 8;
2182 }
2183
2184 return AH->intSize + 1;
2185}
2186
2187int
2189{
2190 int res = 0;
2191 int bv,
2192 b;
2193 int sign = 0; /* Default positive */
2194 int bitShift = 0;
2195
2196 if (AH->version > K_VERS_1_0)
2197 /* Read a sign byte */
2198 sign = AH->ReadBytePtr(AH);
2199
2200 for (b = 0; b < AH->intSize; b++)
2201 {
2202 bv = AH->ReadBytePtr(AH) & 0xFF;
2203 if (bv != 0)
2204 res = res + (bv << bitShift);
2205 bitShift += 8;
2206 }
2207
2208 if (sign)
2209 res = -res;
2210
2211 return res;
2212}
2213
2214size_t
2215WriteStr(ArchiveHandle *AH, const char *c)
2216{
2217 size_t res;
2218
2219 if (c)
2220 {
2221 int len = strlen(c);
2222
2223 res = WriteInt(AH, len);
2224 AH->WriteBufPtr(AH, c, len);
2225 res += len;
2226 }
2227 else
2228 res = WriteInt(AH, -1);
2229
2230 return res;
2231}
2232
2233char *
2235{
2236 char *buf;
2237 int l;
2238
2239 l = ReadInt(AH);
2240 if (l < 0)
2241 buf = NULL;
2242 else
2243 {
2244 buf = (char *) pg_malloc(l + 1);
2245 AH->ReadBufPtr(AH, buf, l);
2246
2247 buf[l] = '\0';
2248 }
2249
2250 return buf;
2251}
2252
2253static bool
2254_fileExistsInDirectory(const char *dir, const char *filename)
2255{
2256 struct stat st;
2257 char buf[MAXPGPATH];
2258
2259 if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2260 pg_fatal("directory name too long: \"%s\"", dir);
2261
2262 return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2263}
2264
2265static int
2267{
2268 FILE *fh;
2269 char sig[6]; /* More than enough */
2270 size_t cnt;
2271 int wantClose = 0;
2272
2273 pg_log_debug("attempting to ascertain archive format");
2274
2275 free(AH->lookahead);
2276
2277 AH->readHeader = 0;
2278 AH->lookaheadSize = 512;
2279 AH->lookahead = pg_malloc0(512);
2280 AH->lookaheadLen = 0;
2281 AH->lookaheadPos = 0;
2282
2283 if (AH->fSpec)
2284 {
2285 struct stat st;
2286
2287 wantClose = 1;
2288
2289 /*
2290 * Check if the specified archive is a directory. If so, check if
2291 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2292 */
2293 if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2294 {
2295 AH->format = archDirectory;
2296 if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2297 return AH->format;
2298#ifdef HAVE_LIBZ
2299 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2300 return AH->format;
2301#endif
2302#ifdef USE_LZ4
2303 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2304 return AH->format;
2305#endif
2306#ifdef USE_ZSTD
2307 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2308 return AH->format;
2309#endif
2310 pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2311 AH->fSpec);
2312 fh = NULL; /* keep compiler quiet */
2313 }
2314 else
2315 {
2316 fh = fopen(AH->fSpec, PG_BINARY_R);
2317 if (!fh)
2318 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2319 }
2320 }
2321 else
2322 {
2323 fh = stdin;
2324 if (!fh)
2325 pg_fatal("could not open input file: %m");
2326 }
2327
2328 if ((cnt = fread(sig, 1, 5, fh)) != 5)
2329 {
2330 if (ferror(fh))
2331 pg_fatal("could not read input file: %m");
2332 else
2333 pg_fatal("input file is too short (read %zu, expected 5)", cnt);
2334 }
2335
2336 /* Save it, just in case we need it later */
2337 memcpy(&AH->lookahead[0], sig, 5);
2338 AH->lookaheadLen = 5;
2339
2340 if (strncmp(sig, "PGDMP", 5) == 0)
2341 {
2342 /* It's custom format, stop here */
2343 AH->format = archCustom;
2344 AH->readHeader = 1;
2345 }
2346 else
2347 {
2348 /*
2349 * *Maybe* we have a tar archive format file or a text dump ... So,
2350 * read first 512 byte header...
2351 */
2352 cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2353 /* read failure is checked below */
2354 AH->lookaheadLen += cnt;
2355
2359 {
2360 /*
2361 * looks like it's probably a text format dump. so suggest they
2362 * try psql
2363 */
2364 pg_fatal("input file appears to be a text format dump. Please use psql.");
2365 }
2366
2367 if (AH->lookaheadLen != 512)
2368 {
2369 if (feof(fh))
2370 pg_fatal("input file does not appear to be a valid archive (too short?)");
2371 else
2373 }
2374
2375 if (!isValidTarHeader(AH->lookahead))
2376 pg_fatal("input file does not appear to be a valid tar archive");
2377
2378 AH->format = archTar;
2379 }
2380
2381 /* Close the file if we opened it */
2382 if (wantClose)
2383 {
2384 if (fclose(fh) != 0)
2385 pg_fatal("could not close input file: %m");
2386 /* Forget lookahead, since we'll re-read header after re-opening */
2387 AH->readHeader = 0;
2388 AH->lookaheadLen = 0;
2389 }
2390
2391 return AH->format;
2392}
2393
2394
2395/*
2396 * Allocate an archive handle
2397 */
2398static ArchiveHandle *
2400 const pg_compress_specification compression_spec,
2401 bool dosync, ArchiveMode mode,
2403{
2404 ArchiveHandle *AH;
2407
2408 pg_log_debug("allocating AH for %s, format %d",
2409 FileSpec ? FileSpec : "(stdio)", fmt);
2410
2412
2413 AH->version = K_VERS_SELF;
2414
2415 /* initialize for backwards compatible string processing */
2416 AH->public.encoding = 0; /* PG_SQL_ASCII */
2417 AH->public.std_strings = true;
2418
2419 /* sql error handling */
2420 AH->public.exit_on_error = true;
2421 AH->public.n_errors = 0;
2422
2424
2425 AH->createDate = time(NULL);
2426
2427 AH->intSize = sizeof(int);
2428 AH->offSize = sizeof(pgoff_t);
2429 if (FileSpec)
2430 {
2431 AH->fSpec = pg_strdup(FileSpec);
2432
2433 /*
2434 * Not used; maybe later....
2435 *
2436 * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2437 * i--) if (AH->workDir[i-1] == '/')
2438 */
2439 }
2440 else
2441 AH->fSpec = NULL;
2442
2443 AH->currUser = NULL; /* unknown */
2444 AH->currSchema = NULL; /* ditto */
2445 AH->currTablespace = NULL; /* ditto */
2446 AH->currTableAm = NULL; /* ditto */
2447
2449
2450 AH->toc->next = AH->toc;
2451 AH->toc->prev = AH->toc;
2452
2453 AH->mode = mode;
2454 AH->compression_spec = compression_spec;
2455 AH->dosync = dosync;
2457
2458 memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2459
2460 /* Open stdout with no compression for AH output handle */
2463 if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2464 pg_fatal("could not open stdout for appending: %m");
2465 AH->OF = CFH;
2466
2467 /*
2468 * On Windows, we need to use binary mode to read/write non-text files,
2469 * which include all archive formats as well as compressed plain text.
2470 * Force stdin/stdout into binary mode if that is what we are using.
2471 */
2472#ifdef WIN32
2473 if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2474 (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2475 {
2476 if (mode == archModeWrite)
2477 _setmode(fileno(stdout), O_BINARY);
2478 else
2479 _setmode(fileno(stdin), O_BINARY);
2480 }
2481#endif
2482
2484
2485 if (fmt == archUnknown)
2487 else
2488 AH->format = fmt;
2489
2490 switch (AH->format)
2491 {
2492 case archCustom:
2494 break;
2495
2496 case archNull:
2498 break;
2499
2500 case archDirectory:
2502 break;
2503
2504 case archTar:
2506 break;
2507
2508 default:
2509 pg_fatal("unrecognized file format \"%d\"", AH->format);
2510 }
2511
2512 return AH;
2513}
2514
2515/*
2516 * Write out all data (tables & LOs)
2517 */
2518void
2520{
2521 TocEntry *te;
2522
2523 if (pstate && pstate->numWorkers > 1)
2524 {
2525 /*
2526 * In parallel mode, this code runs in the leader process. We
2527 * construct an array of candidate TEs, then sort it into decreasing
2528 * size order, then dispatch each TE to a data-transfer worker. By
2529 * dumping larger tables first, we avoid getting into a situation
2530 * where we're down to one job and it's big, losing parallelism.
2531 */
2532 TocEntry **tes;
2533 int ntes;
2534
2536 ntes = 0;
2537 for (te = AH->toc->next; te != AH->toc; te = te->next)
2538 {
2539 /* Consider only TEs with dataDumper functions ... */
2540 if (!te->dataDumper)
2541 continue;
2542 /* ... and ignore ones not enabled for dump */
2543 if ((te->reqs & REQ_DATA) == 0)
2544 continue;
2545
2546 tes[ntes++] = te;
2547 }
2548
2549 if (ntes > 1)
2551
2552 for (int i = 0; i < ntes; i++)
2553 DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2555
2556 pg_free(tes);
2557
2558 /* Now wait for workers to finish. */
2559 WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2560 }
2561 else
2562 {
2563 /* Non-parallel mode: just dump all candidate TEs sequentially. */
2564 for (te = AH->toc->next; te != AH->toc; te = te->next)
2565 {
2566 /* Must have same filter conditions as above */
2567 if (!te->dataDumper)
2568 continue;
2569 if ((te->reqs & REQ_DATA) == 0)
2570 continue;
2571
2573 }
2574 }
2575}
2576
2577
2578/*
2579 * Callback function that's invoked in the leader process after a step has
2580 * been parallel dumped.
2581 *
2582 * We don't need to do anything except check for worker failure.
2583 */
2584static void
2586 TocEntry *te,
2587 int status,
2588 void *callback_data)
2589{
2590 pg_log_info("finished item %d %s %s",
2591 te->dumpId, te->desc, te->tag);
2592
2593 if (status != 0)
2594 pg_fatal("worker process failed: exit code %d",
2595 status);
2596}
2597
2598
2599void
2601{
2604
2605 AH->currToc = te;
2606
2607 if (strcmp(te->desc, "BLOBS") == 0)
2608 {
2609 startPtr = AH->StartLOsPtr;
2610 endPtr = AH->EndLOsPtr;
2611 }
2612 else
2613 {
2614 startPtr = AH->StartDataPtr;
2615 endPtr = AH->EndDataPtr;
2616 }
2617
2618 if (startPtr != NULL)
2619 (*startPtr) (AH, te);
2620
2621 /*
2622 * The user-provided DataDumper routine needs to call AH->WriteData
2623 */
2624 te->dataDumper((Archive *) AH, te->dataDumperArg);
2625
2626 if (endPtr != NULL)
2627 (*endPtr) (AH, te);
2628
2629 AH->currToc = NULL;
2630}
2631
2632void
2634{
2635 TocEntry *te;
2636 char workbuf[32];
2637 int tocCount;
2638 int i;
2639
2640 /* count entries that will actually be dumped */
2641 tocCount = 0;
2642 for (te = AH->toc->next; te != AH->toc; te = te->next)
2643 {
2644 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2645 tocCount++;
2646 }
2647
2648 /* printf("%d TOC Entries to save\n", tocCount); */
2649
2650 WriteInt(AH, tocCount);
2651
2652 for (te = AH->toc->next; te != AH->toc; te = te->next)
2653 {
2654 if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2655 continue;
2656
2657 WriteInt(AH, te->dumpId);
2658 WriteInt(AH, te->dataDumper ? 1 : 0);
2659
2660 /* OID is recorded as a string for historical reasons */
2661 sprintf(workbuf, "%u", te->catalogId.tableoid);
2662 WriteStr(AH, workbuf);
2663 sprintf(workbuf, "%u", te->catalogId.oid);
2664 WriteStr(AH, workbuf);
2665
2666 WriteStr(AH, te->tag);
2667 WriteStr(AH, te->desc);
2668 WriteInt(AH, te->section);
2669
2670 if (te->defnLen)
2671 {
2672 /*
2673 * defnLen should only be set for custom format's second call to
2674 * WriteToc(), which rewrites the TOC in place to update data
2675 * offsets. Instead of calling the defnDumper a second time
2676 * (which could involve re-executing queries), just skip writing
2677 * the entry. While regenerating the definition should
2678 * theoretically produce the same result as before, it's expensive
2679 * and feels risky.
2680 *
2681 * The custom format only calls WriteToc() a second time if
2682 * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2683 * so we can safely use it without checking. For other formats,
2684 * we fail because one of our assumptions must no longer hold
2685 * true.
2686 *
2687 * XXX This is a layering violation, but the alternative is an
2688 * awkward and complicated callback infrastructure for this
2689 * special case. This might be worth revisiting in the future.
2690 */
2691 if (AH->format != archCustom)
2692 pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2693 te->dumpId, te->desc, te->tag);
2694
2695 if (fseeko(AH->FH, te->defnLen, SEEK_CUR) != 0)
2696 pg_fatal("error during file seek: %m");
2697 }
2698 else if (te->defnDumper)
2699 {
2700 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2701
2702 te->defnLen = WriteStr(AH, defn);
2703 pg_free(defn);
2704 }
2705 else
2706 WriteStr(AH, te->defn);
2707
2708 WriteStr(AH, te->dropStmt);
2709 WriteStr(AH, te->copyStmt);
2710 WriteStr(AH, te->namespace);
2711 WriteStr(AH, te->tablespace);
2712 WriteStr(AH, te->tableam);
2713 WriteInt(AH, te->relkind);
2714 WriteStr(AH, te->owner);
2715 WriteStr(AH, "false");
2716
2717 /* Dump list of dependencies */
2718 for (i = 0; i < te->nDeps; i++)
2719 {
2720 sprintf(workbuf, "%d", te->dependencies[i]);
2721 WriteStr(AH, workbuf);
2722 }
2723 WriteStr(AH, NULL); /* Terminate List */
2724
2725 if (AH->WriteExtraTocPtr)
2726 AH->WriteExtraTocPtr(AH, te);
2727 }
2728}
2729
2730void
2732{
2733 int i;
2734 char *tmp;
2735 DumpId *deps;
2736 int depIdx;
2737 int depSize;
2738 TocEntry *te;
2739 bool is_supported;
2740
2741 AH->tocCount = ReadInt(AH);
2742 AH->maxDumpId = 0;
2743
2744 for (i = 0; i < AH->tocCount; i++)
2745 {
2747 te->dumpId = ReadInt(AH);
2748
2749 if (te->dumpId > AH->maxDumpId)
2750 AH->maxDumpId = te->dumpId;
2751
2752 /* Sanity check */
2753 if (te->dumpId <= 0)
2754 pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2755 te->dumpId);
2756
2757 te->hadDumper = ReadInt(AH);
2758
2759 if (AH->version >= K_VERS_1_8)
2760 {
2761 tmp = ReadStr(AH);
2762 sscanf(tmp, "%u", &te->catalogId.tableoid);
2763 free(tmp);
2764 }
2765 else
2767 tmp = ReadStr(AH);
2768 sscanf(tmp, "%u", &te->catalogId.oid);
2769 free(tmp);
2770
2771 te->tag = ReadStr(AH);
2772 te->desc = ReadStr(AH);
2773
2774 if (AH->version >= K_VERS_1_11)
2775 {
2776 te->section = ReadInt(AH);
2777 }
2778 else
2779 {
2780 /*
2781 * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2782 * the entries into sections. This list need not cover entry
2783 * types added later than 8.4.
2784 */
2785 if (strcmp(te->desc, "COMMENT") == 0 ||
2786 strcmp(te->desc, "ACL") == 0 ||
2787 strcmp(te->desc, "ACL LANGUAGE") == 0)
2788 te->section = SECTION_NONE;
2789 else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2790 strcmp(te->desc, "BLOBS") == 0 ||
2791 strcmp(te->desc, "BLOB COMMENTS") == 0)
2792 te->section = SECTION_DATA;
2793 else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2794 strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2795 strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2796 strcmp(te->desc, "INDEX") == 0 ||
2797 strcmp(te->desc, "RULE") == 0 ||
2798 strcmp(te->desc, "TRIGGER") == 0)
2800 else
2802 }
2803
2804 te->defn = ReadStr(AH);
2805 te->dropStmt = ReadStr(AH);
2806
2807 if (AH->version >= K_VERS_1_3)
2808 te->copyStmt = ReadStr(AH);
2809
2810 if (AH->version >= K_VERS_1_6)
2811 te->namespace = ReadStr(AH);
2812
2813 if (AH->version >= K_VERS_1_10)
2814 te->tablespace = ReadStr(AH);
2815
2816 if (AH->version >= K_VERS_1_14)
2817 te->tableam = ReadStr(AH);
2818
2819 if (AH->version >= K_VERS_1_16)
2820 te->relkind = ReadInt(AH);
2821
2822 te->owner = ReadStr(AH);
2823 is_supported = true;
2824 if (AH->version < K_VERS_1_9)
2825 is_supported = false;
2826 else
2827 {
2828 tmp = ReadStr(AH);
2829
2830 if (strcmp(tmp, "true") == 0)
2831 is_supported = false;
2832
2833 free(tmp);
2834 }
2835
2836 if (!is_supported)
2837 pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2838
2839 /* Read TOC entry dependencies */
2840 if (AH->version >= K_VERS_1_5)
2841 {
2842 depSize = 100;
2844 depIdx = 0;
2845 for (;;)
2846 {
2847 tmp = ReadStr(AH);
2848 if (!tmp)
2849 break; /* end of list */
2850 if (depIdx >= depSize)
2851 {
2852 depSize *= 2;
2853 deps = pg_realloc_array(deps, DumpId, depSize);
2854 }
2855 sscanf(tmp, "%d", &deps[depIdx]);
2856 free(tmp);
2857 depIdx++;
2858 }
2859
2860 if (depIdx > 0) /* We have a non-null entry */
2861 {
2862 deps = pg_realloc_array(deps, DumpId, depIdx);
2863 te->dependencies = deps;
2864 te->nDeps = depIdx;
2865 }
2866 else
2867 {
2868 free(deps);
2869 te->dependencies = NULL;
2870 te->nDeps = 0;
2871 }
2872 }
2873 else
2874 {
2875 te->dependencies = NULL;
2876 te->nDeps = 0;
2877 }
2878 te->dataLength = 0;
2879
2880 if (AH->ReadExtraTocPtr)
2881 AH->ReadExtraTocPtr(AH, te);
2882
2883 pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2884 i, te->dumpId, te->desc, te->tag);
2885
2886 /* link completed entry into TOC circular list */
2887 te->prev = AH->toc->prev;
2888 AH->toc->prev->next = te;
2889 AH->toc->prev = te;
2890 te->next = AH->toc;
2891
2892 /* special processing immediately upon read for some items */
2893 if (strcmp(te->desc, "ENCODING") == 0)
2894 processEncodingEntry(AH, te);
2895 else if (strcmp(te->desc, "STDSTRINGS") == 0)
2896 processStdStringsEntry(AH, te);
2897 else if (strcmp(te->desc, "SEARCHPATH") == 0)
2898 processSearchPathEntry(AH, te);
2899 }
2900}
2901
2902static void
2904{
2905 /* te->defn should have the form SET client_encoding = 'foo'; */
2906 char *defn = pg_strdup(te->defn);
2907 char *ptr1;
2908 char *ptr2 = NULL;
2909 int encoding;
2910
2911 ptr1 = strchr(defn, '\'');
2912 if (ptr1)
2913 ptr2 = strchr(++ptr1, '\'');
2914 if (ptr2)
2915 {
2916 *ptr2 = '\0';
2918 if (encoding < 0)
2919 pg_fatal("unrecognized encoding \"%s\"",
2920 ptr1);
2921 AH->public.encoding = encoding;
2923 }
2924 else
2925 pg_fatal("invalid ENCODING item: %s",
2926 te->defn);
2927
2928 free(defn);
2929}
2930
2931static void
2933{
2934 /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2935 char *ptr1;
2936
2937 ptr1 = strchr(te->defn, '\'');
2938 if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2939 AH->public.std_strings = true;
2940 else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2941 AH->public.std_strings = false;
2942 else
2943 pg_fatal("invalid STDSTRINGS item: %s",
2944 te->defn);
2945}
2946
2947static void
2949{
2950 /*
2951 * te->defn should contain a command to set search_path. We just copy it
2952 * verbatim for use later.
2953 */
2954 AH->public.searchpath = pg_strdup(te->defn);
2955}
2956
2957static void
2959{
2960 const char *missing_name;
2961
2962 Assert(ropt->strict_names);
2963
2964 if (ropt->schemaNames.head != NULL)
2965 {
2967 if (missing_name != NULL)
2968 pg_fatal("schema \"%s\" not found", missing_name);
2969 }
2970
2971 if (ropt->tableNames.head != NULL)
2972 {
2974 if (missing_name != NULL)
2975 pg_fatal("table \"%s\" not found", missing_name);
2976 }
2977
2978 if (ropt->indexNames.head != NULL)
2979 {
2981 if (missing_name != NULL)
2982 pg_fatal("index \"%s\" not found", missing_name);
2983 }
2984
2985 if (ropt->functionNames.head != NULL)
2986 {
2988 if (missing_name != NULL)
2989 pg_fatal("function \"%s\" not found", missing_name);
2990 }
2991
2992 if (ropt->triggerNames.head != NULL)
2993 {
2995 if (missing_name != NULL)
2996 pg_fatal("trigger \"%s\" not found", missing_name);
2997 }
2998}
2999
3000/*
3001 * Determine whether we want to restore this TOC entry.
3002 *
3003 * Returns 0 if entry should be skipped, or some combination of the
3004 * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
3005 * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
3006 * special entry.
3007 */
3008static int
3010{
3011 int res = REQ_SCHEMA | REQ_DATA;
3012 RestoreOptions *ropt = AH->public.ropt;
3013
3014 /*
3015 * For binary upgrade mode, dump pg_largeobject_metadata and the
3016 * associated pg_shdepend rows. This is faster to restore than the
3017 * equivalent set of large object commands.
3018 */
3019 if (ropt->binary_upgrade && strcmp(te->desc, "TABLE DATA") == 0 &&
3022 return REQ_DATA;
3023
3024 /* These items are treated specially */
3025 if (strcmp(te->desc, "ENCODING") == 0 ||
3026 strcmp(te->desc, "STDSTRINGS") == 0 ||
3027 strcmp(te->desc, "SEARCHPATH") == 0)
3028 return REQ_SPECIAL;
3029
3030 if ((strcmp(te->desc, "STATISTICS DATA") == 0) ||
3031 (strcmp(te->desc, "EXTENDED STATISTICS DATA") == 0))
3032 {
3033 if (!ropt->dumpStatistics)
3034 return 0;
3035
3036 res = REQ_STATS;
3037 }
3038
3039 /*
3040 * DATABASE and DATABASE PROPERTIES also have a special rule: they are
3041 * restored in createDB mode, and not restored otherwise, independently of
3042 * all else.
3043 */
3044 if (strcmp(te->desc, "DATABASE") == 0 ||
3045 strcmp(te->desc, "DATABASE PROPERTIES") == 0)
3046 {
3047 if (ropt->createDB)
3048 return REQ_SCHEMA;
3049 else
3050 return 0;
3051 }
3052
3053 /*
3054 * Global object TOC entries (e.g., ROLEs or TABLESPACEs) must not be
3055 * ignored.
3056 */
3057 if (strcmp(te->desc, "ROLE") == 0 ||
3058 strcmp(te->desc, "ROLE PROPERTIES") == 0 ||
3059 strcmp(te->desc, "TABLESPACE") == 0 ||
3060 strcmp(te->desc, "DROP_GLOBAL") == 0)
3061 return REQ_SCHEMA;
3062
3063 /*
3064 * Process exclusions that affect certain classes of TOC entries.
3065 */
3066
3067 /* If it's an ACL, maybe ignore it */
3068 if (ropt->aclsSkip && _tocEntryIsACL(te))
3069 return 0;
3070
3071 /* If it's a comment, maybe ignore it */
3072 if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3073 return 0;
3074
3075 /* If it's a policy, maybe ignore it */
3076 if (ropt->no_policies &&
3077 (strcmp(te->desc, "POLICY") == 0 ||
3078 strcmp(te->desc, "ROW SECURITY") == 0))
3079 return 0;
3080
3081 /*
3082 * If it's a comment on a policy, a publication, or a subscription, maybe
3083 * ignore it.
3084 */
3085 if (strcmp(te->desc, "COMMENT") == 0)
3086 {
3087 if (ropt->no_policies &&
3088 strncmp(te->tag, "POLICY", strlen("POLICY")) == 0)
3089 return 0;
3090
3091 if (ropt->no_publications &&
3092 strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3093 return 0;
3094
3095 if (ropt->no_subscriptions &&
3096 strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3097 return 0;
3098
3099 /*
3100 * Comments on global objects (ROLEs or TABLESPACEs) should not be
3101 * skipped, since global objects themselves are never skipped.
3102 */
3103 if (strncmp(te->tag, "ROLE", strlen("ROLE")) == 0 ||
3104 strncmp(te->tag, "TABLESPACE", strlen("TABLESPACE")) == 0)
3105 return REQ_SCHEMA;
3106 }
3107
3108 /*
3109 * If it's a publication or a table part of a publication, maybe ignore
3110 * it.
3111 */
3112 if (ropt->no_publications &&
3113 (strcmp(te->desc, "PUBLICATION") == 0 ||
3114 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3115 strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3116 return 0;
3117
3118 /* If it's a security label, maybe ignore it */
3119 if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3120 return 0;
3121
3122 /*
3123 * If it's a security label on a publication or a subscription, maybe
3124 * ignore it.
3125 */
3126 if (strcmp(te->desc, "SECURITY LABEL") == 0)
3127 {
3128 if (ropt->no_publications &&
3129 strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3130 return 0;
3131
3132 if (ropt->no_subscriptions &&
3133 strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3134 return 0;
3135
3136 /*
3137 * Security labels on global objects (ROLEs or TABLESPACEs) should not
3138 * be skipped, since global objects themselves are never skipped.
3139 */
3140 if (strncmp(te->tag, "ROLE", strlen("ROLE")) == 0 ||
3141 strncmp(te->tag, "TABLESPACE", strlen("TABLESPACE")) == 0)
3142 return REQ_SCHEMA;
3143 }
3144
3145 /* If it's a subscription, maybe ignore it */
3146 if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3147 return 0;
3148
3149 /* Ignore it if section is not to be dumped/restored */
3150 switch (curSection)
3151 {
3152 case SECTION_PRE_DATA:
3153 if (!(ropt->dumpSections & DUMP_PRE_DATA))
3154 return 0;
3155 break;
3156 case SECTION_DATA:
3157 if (!(ropt->dumpSections & DUMP_DATA))
3158 return 0;
3159 break;
3160 case SECTION_POST_DATA:
3161 if (!(ropt->dumpSections & DUMP_POST_DATA))
3162 return 0;
3163 break;
3164 default:
3165 /* shouldn't get here, really, but ignore it */
3166 return 0;
3167 }
3168
3169 /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3170 if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3171 return 0;
3172
3173 /*
3174 * Check options for selective dump/restore.
3175 */
3176 if (strcmp(te->desc, "ACL") == 0 ||
3177 strcmp(te->desc, "COMMENT") == 0 ||
3178 strcmp(te->desc, "STATISTICS DATA") == 0 ||
3179 strcmp(te->desc, "SECURITY LABEL") == 0)
3180 {
3181 /* Database properties react to createDB, not selectivity options. */
3182 if (strncmp(te->tag, "DATABASE ", 9) == 0)
3183 {
3184 if (!ropt->createDB)
3185 return 0;
3186 }
3187 else if (ropt->schemaNames.head != NULL ||
3188 ropt->schemaExcludeNames.head != NULL ||
3189 ropt->selTypes)
3190 {
3191 /*
3192 * In a selective dump/restore, we want to restore these dependent
3193 * TOC entry types only if their parent object is being restored.
3194 * Without selectivity options, we let through everything in the
3195 * archive. Note there may be such entries with no parent, eg
3196 * non-default ACLs for built-in objects. Also, we make
3197 * per-column ACLs additionally depend on the table's ACL if any
3198 * to ensure correct restore order, so those dependencies should
3199 * be ignored in this check.
3200 *
3201 * This code depends on the parent having been marked already,
3202 * which should be the case; if it isn't, perhaps due to
3203 * SortTocFromFile rearrangement, skipping the dependent entry
3204 * seems prudent anyway.
3205 *
3206 * Ideally we'd handle, eg, table CHECK constraints this way too.
3207 * But it's hard to tell which of their dependencies is the one to
3208 * consult.
3209 */
3210 bool dumpthis = false;
3211
3212 for (int i = 0; i < te->nDeps; i++)
3213 {
3215
3216 if (!pte)
3217 continue; /* probably shouldn't happen */
3218 if (strcmp(pte->desc, "ACL") == 0)
3219 continue; /* ignore dependency on another ACL */
3220 if (pte->reqs == 0)
3221 continue; /* this object isn't marked, so ignore it */
3222 /* Found a parent to be dumped, so we want to dump this too */
3223 dumpthis = true;
3224 break;
3225 }
3226 if (!dumpthis)
3227 return 0;
3228 }
3229 }
3230 else
3231 {
3232 /* Apply selective-restore rules for standalone TOC entries. */
3233 if (ropt->schemaNames.head != NULL)
3234 {
3235 /* If no namespace is specified, it means all. */
3236 if (!te->namespace)
3237 return 0;
3238 if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3239 return 0;
3240 }
3241
3242 if (ropt->schemaExcludeNames.head != NULL &&
3243 te->namespace &&
3244 simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3245 return 0;
3246
3247 if (ropt->selTypes)
3248 {
3249 if (strcmp(te->desc, "TABLE") == 0 ||
3250 strcmp(te->desc, "TABLE DATA") == 0 ||
3251 strcmp(te->desc, "VIEW") == 0 ||
3252 strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3253 strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3254 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3255 strcmp(te->desc, "SEQUENCE") == 0 ||
3256 strcmp(te->desc, "SEQUENCE SET") == 0)
3257 {
3258 if (!ropt->selTable)
3259 return 0;
3260 if (ropt->tableNames.head != NULL &&
3262 return 0;
3263 }
3264 else if (strcmp(te->desc, "INDEX") == 0)
3265 {
3266 if (!ropt->selIndex)
3267 return 0;
3268 if (ropt->indexNames.head != NULL &&
3270 return 0;
3271 }
3272 else if (strcmp(te->desc, "FUNCTION") == 0 ||
3273 strcmp(te->desc, "AGGREGATE") == 0 ||
3274 strcmp(te->desc, "PROCEDURE") == 0)
3275 {
3276 if (!ropt->selFunction)
3277 return 0;
3278 if (ropt->functionNames.head != NULL &&
3280 return 0;
3281 }
3282 else if (strcmp(te->desc, "TRIGGER") == 0)
3283 {
3284 if (!ropt->selTrigger)
3285 return 0;
3286 if (ropt->triggerNames.head != NULL &&
3288 return 0;
3289 }
3290 else
3291 return 0;
3292 }
3293 }
3294
3295
3296 /*
3297 * Determine whether the TOC entry contains schema and/or data components,
3298 * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3299 * it's both schema and data. Otherwise it's probably schema-only, but
3300 * there are exceptions.
3301 */
3302 if (!te->hadDumper)
3303 {
3304 /*
3305 * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3306 * is considered a data entry. We don't need to check for BLOBS or
3307 * old-style BLOB COMMENTS entries, because they will have hadDumper =
3308 * true ... but we do need to check new-style BLOB ACLs, comments,
3309 * etc.
3310 */
3311 if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3312 strcmp(te->desc, "BLOB") == 0 ||
3313 strcmp(te->desc, "BLOB METADATA") == 0 ||
3314 (strcmp(te->desc, "ACL") == 0 &&
3315 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3316 (strcmp(te->desc, "COMMENT") == 0 &&
3317 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3318 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3319 strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3320 res = res & REQ_DATA;
3321 else
3322 res = res & ~REQ_DATA;
3323 }
3324
3325 /*
3326 * If there's no definition command, there's no schema component. Treat
3327 * "load via partition root" comments as not schema.
3328 */
3329 if (!te->defn || !te->defn[0] ||
3330 strncmp(te->defn, "-- load via partition root ", 27) == 0)
3331 res = res & ~REQ_SCHEMA;
3332
3333 /*
3334 * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3335 * always ignore it.
3336 */
3337 if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3338 return 0;
3339
3340 /* Mask it if we don't want data */
3341 if (!ropt->dumpData)
3342 {
3343 /*
3344 * The sequence_data option overrides dumpData for SEQUENCE SET.
3345 *
3346 * In binary-upgrade mode, even with dumpData unset, we do not mask
3347 * out large objects. (Only large object definitions, comments and
3348 * other metadata should be generated in binary-upgrade mode, not the
3349 * actual data, but that need not concern us here.)
3350 */
3351 if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3352 !(ropt->binary_upgrade &&
3353 (strcmp(te->desc, "BLOB") == 0 ||
3354 strcmp(te->desc, "BLOB METADATA") == 0 ||
3355 (strcmp(te->desc, "ACL") == 0 &&
3356 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3357 (strcmp(te->desc, "COMMENT") == 0 &&
3358 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3359 (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3360 strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3361 res = res & (REQ_SCHEMA | REQ_STATS);
3362 }
3363
3364 /* Mask it if we don't want schema */
3365 if (!ropt->dumpSchema)
3366 res = res & (REQ_DATA | REQ_STATS);
3367
3368 return res;
3369}
3370
3371/*
3372 * Identify which pass we should restore this TOC entry in.
3373 *
3374 * See notes with the RestorePass typedef in pg_backup_archiver.h.
3375 */
3376static RestorePass
3378{
3379 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3380 if (strcmp(te->desc, "ACL") == 0 ||
3381 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3382 strcmp(te->desc, "DEFAULT ACL") == 0)
3383 return RESTORE_PASS_ACL;
3384 if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3385 strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3386 return RESTORE_PASS_POST_ACL;
3387
3388 /*
3389 * Comments and security labels need to be emitted in the same pass as
3390 * their parent objects. ACLs haven't got comments and security labels,
3391 * and neither do matview data objects, but event triggers do.
3392 * (Fortunately, event triggers haven't got ACLs, or we'd need yet another
3393 * weird special case.)
3394 */
3395 if ((strcmp(te->desc, "COMMENT") == 0 ||
3396 strcmp(te->desc, "SECURITY LABEL") == 0) &&
3397 strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3398 return RESTORE_PASS_POST_ACL;
3399
3400 /*
3401 * If statistics data is dependent on materialized view data, it must be
3402 * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3403 * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3404 * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3405 * code in fetchAttributeStats() assumes that we dump all statistics data
3406 * entries in TOC order. To ensure this assumption holds, we move all
3407 * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3408 */
3409 if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3411 return RESTORE_PASS_POST_ACL;
3412
3413 /* All else can be handled in the main pass. */
3414 return RESTORE_PASS_MAIN;
3415}
3416
3417/*
3418 * Identify TOC entries that are ACLs.
3419 *
3420 * Note: it seems worth duplicating some code here to avoid a hard-wired
3421 * assumption that these are exactly the same entries that we restore during
3422 * the RESTORE_PASS_ACL phase.
3423 */
3424static bool
3426{
3427 /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3428 if (strcmp(te->desc, "ACL") == 0 ||
3429 strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3430 strcmp(te->desc, "DEFAULT ACL") == 0)
3431 return true;
3432 return false;
3433}
3434
3435/*
3436 * Issue SET commands for parameters that we want to have set the same way
3437 * at all times during execution of a restore script.
3438 */
3439static void
3441{
3442 RestoreOptions *ropt = AH->public.ropt;
3443
3444 /*
3445 * Disable timeouts to allow for slow commands, idle parallel workers, etc
3446 */
3447 ahprintf(AH, "SET statement_timeout = 0;\n");
3448 ahprintf(AH, "SET lock_timeout = 0;\n");
3449 ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3450 ahprintf(AH, "SET transaction_timeout = 0;\n");
3451
3452 /* Select the correct character set encoding */
3453 ahprintf(AH, "SET client_encoding = '%s';\n",
3455
3456 /* Select the correct string literal syntax */
3457 ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3458 AH->public.std_strings ? "on" : "off");
3459
3460 /* Select the role to be used during restore */
3461 if (ropt && ropt->use_role)
3462 ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3463
3464 /* Select the dump-time search_path */
3465 if (AH->public.searchpath)
3466 ahprintf(AH, "%s", AH->public.searchpath);
3467
3468 /* Make sure function checking is disabled */
3469 ahprintf(AH, "SET check_function_bodies = false;\n");
3470
3471 /* Ensure that all valid XML data will be accepted */
3472 ahprintf(AH, "SET xmloption = content;\n");
3473
3474 /* Avoid annoying notices etc */
3475 ahprintf(AH, "SET client_min_messages = warning;\n");
3476
3477 /* Adjust row-security state */
3478 if (ropt && ropt->enable_row_security)
3479 ahprintf(AH, "SET row_security = on;\n");
3480 else
3481 ahprintf(AH, "SET row_security = off;\n");
3482
3483 /*
3484 * In --transaction-size mode, we should always be in a transaction when
3485 * we begin to restore objects.
3486 */
3487 if (ropt && ropt->txn_size > 0)
3488 {
3489 if (AH->connection)
3491 else
3492 ahprintf(AH, "\nBEGIN;\n");
3493 AH->txnCount = 0;
3494 }
3495
3496 ahprintf(AH, "\n");
3497}
3498
3499/*
3500 * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3501 * for updating state if appropriate. If user is NULL or an empty string,
3502 * the specification DEFAULT will be used.
3503 */
3504static void
3506{
3508
3509 appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3510
3511 /*
3512 * SQL requires a string literal here. Might as well be correct.
3513 */
3514 if (user && *user)
3515 appendStringLiteralAHX(cmd, user, AH);
3516 else
3517 appendPQExpBufferStr(cmd, "DEFAULT");
3518 appendPQExpBufferChar(cmd, ';');
3519
3520 if (RestoringToDB(AH))
3521 {
3522 PGresult *res;
3523
3524 res = PQexec(AH->connection, cmd->data);
3525
3526 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3527 /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3528 pg_fatal("could not set session user to \"%s\": %s",
3530
3531 PQclear(res);
3532 }
3533 else
3534 ahprintf(AH, "%s\n\n", cmd->data);
3535
3536 destroyPQExpBuffer(cmd);
3537}
3538
3539
3540/*
3541 * Issue the commands to connect to the specified database.
3542 *
3543 * If we're currently restoring right into a database, this will
3544 * actually establish a connection. Otherwise it puts a \connect into
3545 * the script output.
3546 */
3547static void
3549{
3550 if (RestoringToDB(AH))
3552 else
3553 {
3555 RestoreOptions *ropt = AH->public.ropt;
3556
3557 /*
3558 * We must temporarily exit restricted mode for \connect, etc.
3559 * Anything added between this line and the following \restrict must
3560 * be careful to avoid any possible meta-command injection vectors.
3561 */
3562 ahprintf(AH, "\\unrestrict %s\n", ropt->restrict_key);
3563
3566 ahprintf(AH, "%s", connectbuf.data);
3568
3569 ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
3570 }
3571
3572 /*
3573 * NOTE: currUser keeps track of what the imaginary session user in our
3574 * script is. It's now effectively reset to the original userID.
3575 */
3576 free(AH->currUser);
3577 AH->currUser = NULL;
3578
3579 /* don't assume we still know the output schema, tablespace, etc either */
3580 free(AH->currSchema);
3581 AH->currSchema = NULL;
3582
3583 free(AH->currTableAm);
3584 AH->currTableAm = NULL;
3585
3586 free(AH->currTablespace);
3587 AH->currTablespace = NULL;
3588
3589 /* re-establish fixed state */
3591}
3592
3593/*
3594 * Become the specified user, and update state to avoid redundant commands
3595 *
3596 * NULL or empty argument is taken to mean restoring the session default
3597 */
3598static void
3600{
3601 if (!user)
3602 user = ""; /* avoid null pointers */
3603
3604 if (AH->currUser && strcmp(AH->currUser, user) == 0)
3605 return; /* no need to do anything */
3606
3608
3609 /*
3610 * NOTE: currUser keeps track of what the imaginary session user in our
3611 * script is
3612 */
3613 free(AH->currUser);
3614 AH->currUser = pg_strdup(user);
3615}
3616
3617/*
3618 * Become the owner of the given TOC entry object. If
3619 * changes in ownership are not allowed, this doesn't do anything.
3620 */
3621static void
3623{
3624 RestoreOptions *ropt = AH->public.ropt;
3625
3626 if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3627 return;
3628
3629 _becomeUser(AH, te->owner);
3630}
3631
3632
3633/*
3634 * Issue the commands to select the specified schema as the current schema
3635 * in the target database.
3636 */
3637static void
3639{
3640 PQExpBuffer qry;
3641
3642 /*
3643 * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3644 * that search_path rather than switching to entry-specific paths.
3645 * Otherwise, it's an old archive that will not restore correctly unless
3646 * we set the search_path as it's expecting.
3647 */
3648 if (AH->public.searchpath)
3649 return;
3650
3651 if (!schemaName || *schemaName == '\0' ||
3652 (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3653 return; /* no need to do anything */
3654
3655 qry = createPQExpBuffer();
3656
3657 appendPQExpBuffer(qry, "SET search_path = %s",
3658 fmtId(schemaName));
3659 if (strcmp(schemaName, "pg_catalog") != 0)
3660 appendPQExpBufferStr(qry, ", pg_catalog");
3661
3662 if (RestoringToDB(AH))
3663 {
3664 PGresult *res;
3665
3666 res = PQexec(AH->connection, qry->data);
3667
3668 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3670 "could not set \"search_path\" to \"%s\": %s",
3672
3673 PQclear(res);
3674 }
3675 else
3676 ahprintf(AH, "%s;\n\n", qry->data);
3677
3678 free(AH->currSchema);
3680
3681 destroyPQExpBuffer(qry);
3682}
3683
3684/*
3685 * Issue the commands to select the specified tablespace as the current one
3686 * in the target database.
3687 */
3688static void
3690{
3691 RestoreOptions *ropt = AH->public.ropt;
3692 PQExpBuffer qry;
3693 const char *want,
3694 *have;
3695
3696 /* do nothing in --no-tablespaces mode */
3697 if (ropt->noTablespace)
3698 return;
3699
3700 have = AH->currTablespace;
3701 want = tablespace;
3702
3703 /* no need to do anything for non-tablespace object */
3704 if (!want)
3705 return;
3706
3707 if (have && strcmp(want, have) == 0)
3708 return; /* no need to do anything */
3709
3710 qry = createPQExpBuffer();
3711
3712 if (strcmp(want, "") == 0)
3713 {
3714 /* We want the tablespace to be the database's default */
3715 appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3716 }
3717 else
3718 {
3719 /* We want an explicit tablespace */
3720 appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3721 }
3722
3723 if (RestoringToDB(AH))
3724 {
3725 PGresult *res;
3726
3727 res = PQexec(AH->connection, qry->data);
3728
3729 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3731 "could not set \"default_tablespace\" to %s: %s",
3733
3734 PQclear(res);
3735 }
3736 else
3737 ahprintf(AH, "%s;\n\n", qry->data);
3738
3739 free(AH->currTablespace);
3741
3742 destroyPQExpBuffer(qry);
3743}
3744
3745/*
3746 * Set the proper default_table_access_method value for the table.
3747 */
3748static void
3750{
3751 RestoreOptions *ropt = AH->public.ropt;
3752 PQExpBuffer cmd;
3753 const char *want,
3754 *have;
3755
3756 /* do nothing in --no-table-access-method mode */
3757 if (ropt->noTableAm)
3758 return;
3759
3760 have = AH->currTableAm;
3761 want = tableam;
3762
3763 if (!want)
3764 return;
3765
3766 if (have && strcmp(want, have) == 0)
3767 return;
3768
3769 cmd = createPQExpBuffer();
3770 appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3771
3772 if (RestoringToDB(AH))
3773 {
3774 PGresult *res;
3775
3776 res = PQexec(AH->connection, cmd->data);
3777
3778 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3780 "could not set \"default_table_access_method\": %s",
3782
3783 PQclear(res);
3784 }
3785 else
3786 ahprintf(AH, "%s\n\n", cmd->data);
3787
3788 destroyPQExpBuffer(cmd);
3789
3790 free(AH->currTableAm);
3791 AH->currTableAm = pg_strdup(want);
3792}
3793
3794/*
3795 * Set the proper default table access method for a table without storage.
3796 * Currently, this is required only for partitioned tables with a table AM.
3797 */
3798static void
3800{
3801 RestoreOptions *ropt = AH->public.ropt;
3802 const char *tableam = te->tableam;
3803 PQExpBuffer cmd;
3804
3805 /* do nothing in --no-table-access-method mode */
3806 if (ropt->noTableAm)
3807 return;
3808
3809 if (!tableam)
3810 return;
3811
3813
3814 cmd = createPQExpBuffer();
3815
3816 appendPQExpBufferStr(cmd, "ALTER TABLE ");
3817 appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3818 appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3819 fmtId(tableam));
3820
3821 if (RestoringToDB(AH))
3822 {
3823 PGresult *res;
3824
3825 res = PQexec(AH->connection, cmd->data);
3826
3827 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3829 "could not alter table access method: %s",
3831 PQclear(res);
3832 }
3833 else
3834 ahprintf(AH, "%s\n\n", cmd->data);
3835
3836 destroyPQExpBuffer(cmd);
3837}
3838
3839/*
3840 * Extract an object description for a TOC entry, and append it to buf.
3841 *
3842 * This is used for ALTER ... OWNER TO.
3843 *
3844 * If the object type has no owner, do nothing.
3845 */
3846static void
3848{
3849 const char *type = te->desc;
3850
3851 /* objects that don't require special decoration */
3852 if (strcmp(type, "COLLATION") == 0 ||
3853 strcmp(type, "CONVERSION") == 0 ||
3854 strcmp(type, "DOMAIN") == 0 ||
3855 strcmp(type, "FOREIGN TABLE") == 0 ||
3856 strcmp(type, "MATERIALIZED VIEW") == 0 ||
3857 strcmp(type, "PROPERTY GRAPH") == 0 ||
3858 strcmp(type, "SEQUENCE") == 0 ||
3859 strcmp(type, "STATISTICS") == 0 ||
3860 strcmp(type, "TABLE") == 0 ||
3861 strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3862 strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3863 strcmp(type, "TYPE") == 0 ||
3864 strcmp(type, "VIEW") == 0 ||
3865 /* non-schema-specified objects */
3866 strcmp(type, "DATABASE") == 0 ||
3867 strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3868 strcmp(type, "SCHEMA") == 0 ||
3869 strcmp(type, "EVENT TRIGGER") == 0 ||
3870 strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3871 strcmp(type, "SERVER") == 0 ||
3872 strcmp(type, "PUBLICATION") == 0 ||
3873 strcmp(type, "SUBSCRIPTION") == 0)
3874 {
3875 appendPQExpBuffer(buf, "%s ", type);
3876 if (te->namespace && *te->namespace)
3877 appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3879 }
3880 /* LOs just have a name, but it's numeric so must not use fmtId */
3881 else if (strcmp(type, "BLOB") == 0)
3882 {
3883 appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3884 }
3885
3886 /*
3887 * These object types require additional decoration. Fortunately, the
3888 * information needed is exactly what's in the DROP command.
3889 */
3890 else if (strcmp(type, "AGGREGATE") == 0 ||
3891 strcmp(type, "FUNCTION") == 0 ||
3892 strcmp(type, "OPERATOR") == 0 ||
3893 strcmp(type, "OPERATOR CLASS") == 0 ||
3894 strcmp(type, "OPERATOR FAMILY") == 0 ||
3895 strcmp(type, "PROCEDURE") == 0)
3896 {
3897 /* Chop "DROP " off the front and make a modifiable copy */
3898 char *first = pg_strdup(te->dropStmt + 5);
3899 char *last;
3900
3901 /* point to last character in string */
3902 last = first + strlen(first) - 1;
3903
3904 /* Strip off any ';' or '\n' at the end */
3905 while (last >= first && (*last == '\n' || *last == ';'))
3906 last--;
3907 *(last + 1) = '\0';
3908
3909 appendPQExpBufferStr(buf, first);
3910
3911 free(first);
3912 return;
3913 }
3914 /* these object types don't have separate owners */
3915 else if (strcmp(type, "CAST") == 0 ||
3916 strcmp(type, "CHECK CONSTRAINT") == 0 ||
3917 strcmp(type, "CONSTRAINT") == 0 ||
3918 strcmp(type, "DROP_GLOBAL") == 0 ||
3919 strcmp(type, "ROLE PROPERTIES") == 0 ||
3920 strcmp(type, "ROLE") == 0 ||
3921 strcmp(type, "DATABASE PROPERTIES") == 0 ||
3922 strcmp(type, "DEFAULT") == 0 ||
3923 strcmp(type, "FK CONSTRAINT") == 0 ||
3924 strcmp(type, "INDEX") == 0 ||
3925 strcmp(type, "RULE") == 0 ||
3926 strcmp(type, "TRIGGER") == 0 ||
3927 strcmp(type, "ROW SECURITY") == 0 ||
3928 strcmp(type, "POLICY") == 0 ||
3929 strcmp(type, "USER MAPPING") == 0)
3930 {
3931 /* do nothing */
3932 }
3933 else
3934 pg_fatal("don't know how to set owner for object type \"%s\"", type);
3935}
3936
3937/*
3938 * Emit the SQL commands to create the object represented by a TOC entry
3939 *
3940 * This now also includes issuing an ALTER OWNER command to restore the
3941 * object's ownership, if wanted. But note that the object's permissions
3942 * will remain at default, until the matching ACL TOC entry is restored.
3943 */
3944static void
3946{
3947 RestoreOptions *ropt = AH->public.ropt;
3948
3949 /*
3950 * Select owner, schema, tablespace and default AM as necessary. The
3951 * default access method for partitioned tables is handled after
3952 * generating the object definition, as it requires an ALTER command
3953 * rather than SET.
3954 */
3955 _becomeOwner(AH, te);
3956 _selectOutputSchema(AH, te->namespace);
3960
3961 /* Emit header comment for item */
3962 if (!AH->noTocComments)
3963 {
3964 char *sanitized_name;
3965 char *sanitized_schema;
3966 char *sanitized_owner;
3967
3968 ahprintf(AH, "--\n");
3969 if (AH->public.verbose)
3970 {
3971 ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3972 te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3973 if (te->nDeps > 0)
3974 {
3975 int i;
3976
3977 ahprintf(AH, "-- Dependencies:");
3978 for (i = 0; i < te->nDeps; i++)
3979 ahprintf(AH, " %d", te->dependencies[i]);
3980 ahprintf(AH, "\n");
3981 }
3982 }
3983
3984 sanitized_name = sanitize_line(te->tag, false);
3985 sanitized_schema = sanitize_line(te->namespace, true);
3986 sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3987
3988 ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3991
3995
3996 if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3997 {
3999
4001 ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
4003 }
4004 ahprintf(AH, "\n");
4005
4006 if (AH->PrintExtraTocPtr != NULL)
4007 AH->PrintExtraTocPtr(AH, te);
4008 ahprintf(AH, "--\n\n");
4009 }
4010
4011 /*
4012 * Actually print the definition. Normally we can just print the defn
4013 * string if any, but we have four special cases:
4014 *
4015 * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
4016 * versions put into CREATE SCHEMA. Don't mutate the variant for schema
4017 * "public" that is a comment. We have to do this when --no-owner mode is
4018 * selected. This is ugly, but I see no other good way ...
4019 *
4020 * 2. BLOB METADATA entries need special processing since their defn
4021 * strings are just lists of OIDs, not complete SQL commands.
4022 *
4023 * 3. ACL LARGE OBJECTS entries need special processing because they
4024 * contain only one copy of the ACL GRANT/REVOKE commands, which we must
4025 * apply to each large object listed in the associated BLOB METADATA.
4026 *
4027 * 4. Entries with a defnDumper need to call it to generate the
4028 * definition. This is primarily intended to provide a way to save memory
4029 * for objects that would otherwise need a lot of it (e.g., statistics
4030 * data).
4031 */
4032 if (ropt->noOwner &&
4033 strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
4034 {
4035 ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
4036 }
4037 else if (strcmp(te->desc, "BLOB METADATA") == 0)
4038 {
4039 IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
4040 }
4041 else if (strcmp(te->desc, "ACL") == 0 &&
4042 strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
4043 {
4044 IssueACLPerBlob(AH, te);
4045 }
4046 else if (te->defnLen && AH->format != archTar)
4047 {
4048 /*
4049 * If defnLen is set, the defnDumper has already been called for this
4050 * TOC entry. We don't normally expect a defnDumper to be called for
4051 * a TOC entry a second time in _printTocEntry(), but there's an
4052 * exception. The tar format first calls WriteToc(), which scans the
4053 * entire TOC, and then it later calls RestoreArchive() to generate
4054 * restore.sql, which scans the TOC again. There doesn't appear to be
4055 * a good way to prevent a second defnDumper call in this case without
4056 * storing the definition in memory, which defeats the purpose. This
4057 * second defnDumper invocation should generate the same output as the
4058 * first, but even if it doesn't, the worst-case scenario is that
4059 * restore.sql might have different statistics data than the archive.
4060 *
4061 * In all other cases, encountering a TOC entry a second time in
4062 * _printTocEntry() is unexpected, so we fail because one of our
4063 * assumptions must no longer hold true.
4064 *
4065 * XXX This is a layering violation, but the alternative is an awkward
4066 * and complicated callback infrastructure for this special case. This
4067 * might be worth revisiting in the future.
4068 */
4069 pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
4070 te->dumpId, te->desc, te->tag);
4071 }
4072 else if (te->defnDumper)
4073 {
4074 char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
4075
4076 te->defnLen = ahprintf(AH, "%s\n\n", defn);
4077 pg_free(defn);
4078 }
4079 else if (te->defn && strlen(te->defn) > 0)
4080 {
4081 ahprintf(AH, "%s\n\n", te->defn);
4082
4083 /*
4084 * If the defn string contains multiple SQL commands, txn_size mode
4085 * should count it as N actions not one. But rather than build a full
4086 * SQL parser, approximate this by counting semicolons. One case
4087 * where that tends to be badly fooled is function definitions, so
4088 * ignore them. (restore_toc_entry will count one action anyway.)
4089 */
4090 if (ropt->txn_size > 0 &&
4091 strcmp(te->desc, "FUNCTION") != 0 &&
4092 strcmp(te->desc, "PROCEDURE") != 0)
4093 {
4094 const char *p = te->defn;
4095 int nsemis = 0;
4096
4097 while ((p = strchr(p, ';')) != NULL)
4098 {
4099 nsemis++;
4100 p++;
4101 }
4102 if (nsemis > 1)
4103 AH->txnCount += nsemis - 1;
4104 }
4105 }
4106
4107 /*
4108 * If we aren't using SET SESSION AUTH to determine ownership, we must
4109 * instead issue an ALTER OWNER command. Schema "public" is special; when
4110 * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
4111 * when using SET SESSION for all other objects. We assume that anything
4112 * without a DROP command is not a separately ownable object.
4113 */
4114 if (!ropt->noOwner &&
4115 (!ropt->use_setsessauth ||
4116 (strcmp(te->desc, "SCHEMA") == 0 &&
4117 strncmp(te->defn, "--", 2) == 0)) &&
4118 te->owner && strlen(te->owner) > 0 &&
4119 te->dropStmt && strlen(te->dropStmt) > 0)
4120 {
4121 if (strcmp(te->desc, "BLOB METADATA") == 0)
4122 {
4123 /* BLOB METADATA needs special code to handle multiple LOs */
4124 char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4125
4126 IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4127 pg_free(cmdEnd);
4128 }
4129 else
4130 {
4131 /* For all other cases, we can use _getObjectDescription */
4133
4136
4137 /*
4138 * If _getObjectDescription() didn't fill the buffer, then there
4139 * is no owner.
4140 */
4141 if (temp.data[0])
4142 ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4143 temp.data, fmtId(te->owner));
4145 }
4146 }
4147
4148 /*
4149 * Select a partitioned table's default AM, once the table definition has
4150 * been generated.
4151 */
4154
4155 /*
4156 * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4157 * commands, so we can no longer assume we know the current auth setting.
4158 */
4159 if (_tocEntryIsACL(te))
4160 {
4161 free(AH->currUser);
4162 AH->currUser = NULL;
4163 }
4164}
4165
4166/*
4167 * Write the file header for a custom-format archive
4168 */
4169void
4171{
4172 struct tm crtm;
4173
4174 AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4175 AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4176 AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4177 AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4178 AH->WriteBytePtr(AH, AH->intSize);
4179 AH->WriteBytePtr(AH, AH->offSize);
4180 AH->WriteBytePtr(AH, AH->format);
4182 crtm = *localtime(&AH->createDate);
4183 WriteInt(AH, crtm.tm_sec);
4184 WriteInt(AH, crtm.tm_min);
4185 WriteInt(AH, crtm.tm_hour);
4186 WriteInt(AH, crtm.tm_mday);
4187 WriteInt(AH, crtm.tm_mon);
4188 WriteInt(AH, crtm.tm_year);
4189 WriteInt(AH, crtm.tm_isdst);
4190 WriteStr(AH, PQdb(AH->connection));
4192 WriteStr(AH, PG_VERSION);
4193}
4194
4195void
4197{
4198 char *errmsg;
4199 char vmaj,
4200 vmin,
4201 vrev;
4202 int fmt;
4203
4204 /*
4205 * If we haven't already read the header, do so.
4206 *
4207 * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4208 * way to unify the cases?
4209 */
4210 if (!AH->readHeader)
4211 {
4212 char tmpMag[7];
4213
4214 AH->ReadBufPtr(AH, tmpMag, 5);
4215
4216 if (strncmp(tmpMag, "PGDMP", 5) != 0)
4217 pg_fatal("did not find magic string in file header");
4218 }
4219
4220 vmaj = AH->ReadBytePtr(AH);
4221 vmin = AH->ReadBytePtr(AH);
4222
4223 if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4224 vrev = AH->ReadBytePtr(AH);
4225 else
4226 vrev = 0;
4227
4229
4230 if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4231 pg_fatal("unsupported version (%d.%d) in file header",
4232 vmaj, vmin);
4233
4234 AH->intSize = AH->ReadBytePtr(AH);
4235 if (AH->intSize > 32)
4236 pg_fatal("sanity check on integer size (%zu) failed", AH->intSize);
4237
4238 if (AH->intSize > sizeof(int))
4239 pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4240
4241 if (AH->version >= K_VERS_1_7)
4242 AH->offSize = AH->ReadBytePtr(AH);
4243 else
4244 AH->offSize = AH->intSize;
4245
4246 fmt = AH->ReadBytePtr(AH);
4247
4248 if (AH->format != fmt)
4249 pg_fatal("expected format (%d) differs from format found in file (%d)",
4250 AH->format, fmt);
4251
4252 if (AH->version >= K_VERS_1_15)
4254 else if (AH->version >= K_VERS_1_2)
4255 {
4256 /* Guess the compression method based on the level */
4257 if (AH->version < K_VERS_1_4)
4258 AH->compression_spec.level = AH->ReadBytePtr(AH);
4259 else
4260 AH->compression_spec.level = ReadInt(AH);
4261
4262 if (AH->compression_spec.level != 0)
4264 }
4265 else
4267
4269 if (errmsg)
4270 {
4271 pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4272 errmsg);
4273 pg_free(errmsg);
4274 }
4275
4276 if (AH->version >= K_VERS_1_4)
4277 {
4278 struct tm crtm;
4279
4280 crtm.tm_sec = ReadInt(AH);
4281 crtm.tm_min = ReadInt(AH);
4282 crtm.tm_hour = ReadInt(AH);
4283 crtm.tm_mday = ReadInt(AH);
4284 crtm.tm_mon = ReadInt(AH);
4285 crtm.tm_year = ReadInt(AH);
4286 crtm.tm_isdst = ReadInt(AH);
4287
4288 /*
4289 * Newer versions of glibc have mktime() report failure if tm_isdst is
4290 * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4291 * TZ=UTC. This is problematic when restoring an archive under a
4292 * different timezone setting. If we get a failure, try again with
4293 * tm_isdst set to -1 ("don't know").
4294 *
4295 * XXX with or without this hack, we reconstruct createDate
4296 * incorrectly when the prevailing timezone is different from
4297 * pg_dump's. Next time we bump the archive version, we should flush
4298 * this representation and store a plain seconds-since-the-Epoch
4299 * timestamp instead.
4300 */
4301 AH->createDate = mktime(&crtm);
4302 if (AH->createDate == (time_t) -1)
4303 {
4304 crtm.tm_isdst = -1;
4305 AH->createDate = mktime(&crtm);
4306 if (AH->createDate == (time_t) -1)
4307 pg_log_warning("invalid creation date in header");
4308 }
4309 }
4310
4311 if (AH->version >= K_VERS_1_4)
4312 {
4313 AH->archdbname = ReadStr(AH);
4314 }
4315
4316 if (AH->version >= K_VERS_1_10)
4317 {
4318 AH->archiveRemoteVersion = ReadStr(AH);
4319 AH->archiveDumpVersion = ReadStr(AH);
4320 }
4321}
4322
4323
4324/*
4325 * checkSeek
4326 * check to see if ftell/fseek can be performed.
4327 */
4328bool
4330{
4331 pgoff_t tpos;
4332
4333 /* Check that ftello works on this file */
4334 tpos = ftello(fp);
4335 if (tpos < 0)
4336 return false;
4337
4338 /*
4339 * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4340 * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4341 * successful no-op even on files that are otherwise unseekable.
4342 */
4343 if (fseeko(fp, tpos, SEEK_SET) != 0)
4344 return false;
4345
4346 return true;
4347}
4348
4349
4350/*
4351 * dumpTimestamp
4352 */
4353static void
4355{
4356 char buf[64];
4357
4358 if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4359 ahprintf(AH, "-- %s %s\n\n", msg, buf);
4360}
4361
4362/*
4363 * Main engine for parallel restore.
4364 *
4365 * Parallel restore is done in three phases. In this first phase,
4366 * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4367 * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4368 * PRE_DATA items other than ACLs.) Entries we can't process now are
4369 * added to the pending_list for later phases to deal with.
4370 */
4371static void
4373{
4374 bool skipped_some;
4376
4377 pg_log_debug("entering restore_toc_entries_prefork");
4378
4379 /* Adjust dependency information */
4380 fix_dependencies(AH);
4381
4382 /*
4383 * Do all the early stuff in a single connection in the parent. There's no
4384 * great point in running it in parallel, in fact it will actually run
4385 * faster in a single connection because we avoid all the connection and
4386 * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4387 * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4388 * not risk trying to process them out-of-order.
4389 *
4390 * Stuff that we can't do immediately gets added to the pending_list.
4391 * Note: we don't yet filter out entries that aren't going to be restored.
4392 * They might participate in dependency chains connecting entries that
4393 * should be restored, so we treat them as live until we actually process
4394 * them.
4395 *
4396 * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4397 * before DATA items, and all DATA items before POST_DATA items. That is
4398 * not certain to be true in older archives, though, and in any case use
4399 * of a list file would destroy that ordering (cf. SortTocFromFile). So
4400 * this loop cannot assume that it holds.
4401 */
4403 skipped_some = false;
4405 {
4406 bool do_now = true;
4407
4408 if (next_work_item->section != SECTION_PRE_DATA)
4409 {
4410 /* DATA and POST_DATA items are just ignored for now */
4411 if (next_work_item->section == SECTION_DATA ||
4413 {
4414 do_now = false;
4415 skipped_some = true;
4416 }
4417 else
4418 {
4419 /*
4420 * SECTION_NONE items, such as comments, can be processed now
4421 * if we are still in the PRE_DATA part of the archive. Once
4422 * we've skipped any items, we have to consider whether the
4423 * comment's dependencies are satisfied, so skip it for now.
4424 */
4425 if (skipped_some)
4426 do_now = false;
4427 }
4428 }
4429
4430 /*
4431 * Also skip items that need to be forced into later passes. We need
4432 * not set skipped_some in this case, since by assumption no main-pass
4433 * items could depend on these.
4434 */
4436 do_now = false;
4437
4438 if (do_now)
4439 {
4440 /* OK, restore the item and update its dependencies */
4441 pg_log_info("processing item %d %s %s",
4442 next_work_item->dumpId,
4443 next_work_item->desc, next_work_item->tag);
4444
4446
4447 /* Reduce dependencies, but don't move anything to ready_heap */
4449 }
4450 else
4451 {
4452 /* Nope, so add it to pending_list */
4454 }
4455 }
4456
4457 /*
4458 * In --transaction-size mode, we must commit the open transaction before
4459 * dropping the database connection. This also ensures that child workers
4460 * can see the objects we've created so far.
4461 */
4462 if (AH->public.ropt->txn_size > 0)
4464
4465 /*
4466 * Now close parent connection in prep for parallel steps. We do this
4467 * mainly to ensure that we don't exceed the specified number of parallel
4468 * connections.
4469 */
4471
4472 /* blow away any transient state from the old connection */
4473 free(AH->currUser);
4474 AH->currUser = NULL;
4475 free(AH->currSchema);
4476 AH->currSchema = NULL;
4477 free(AH->currTablespace);
4478 AH->currTablespace = NULL;
4479 free(AH->currTableAm);
4480 AH->currTableAm = NULL;
4481}
4482
4483/*
4484 * Main engine for parallel restore.
4485 *
4486 * Parallel restore is done in three phases. In this second phase,
4487 * we process entries by dispatching them to parallel worker children
4488 * (processes on Unix, threads on Windows), each of which connects
4489 * separately to the database. Inter-entry dependencies are respected,
4490 * and so is the RestorePass multi-pass structure. When we can no longer
4491 * make any entries ready to process, we exit. Normally, there will be
4492 * nothing left to do; but if there is, the third phase will mop up.
4493 */
4494static void
4497{
4500
4501 pg_log_debug("entering restore_toc_entries_parallel");
4502
4503 /* Set up ready_heap with enough room for all known TocEntrys */
4506 NULL);
4507
4508 /*
4509 * The pending_list contains all items that we need to restore. Move all
4510 * items that are available to process immediately into the ready_heap.
4511 * After this setup, the pending list is everything that needs to be done
4512 * but is blocked by one or more dependencies, while the ready heap
4513 * contains items that have no remaining dependencies and are OK to
4514 * process in the current restore pass.
4515 */
4518
4519 /*
4520 * main parent loop
4521 *
4522 * Keep going until there is no worker still running AND there is no work
4523 * left to be done. Note invariant: at top of loop, there should always
4524 * be at least one worker available to dispatch a job to.
4525 */
4526 pg_log_info("entering main parallel loop");
4527
4528 for (;;)
4529 {
4530 /* Look for an item ready to be dispatched to a worker */
4532 if (next_work_item != NULL)
4533 {
4534 /* If not to be restored, don't waste time launching a worker */
4535 if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4536 {
4537 pg_log_info("skipping item %d %s %s",
4538 next_work_item->dumpId,
4539 next_work_item->desc, next_work_item->tag);
4540 /* Update its dependencies as though we'd completed it */
4542 /* Loop around to see if anything else can be dispatched */
4543 continue;
4544 }
4545
4546 pg_log_info("launching item %d %s %s",
4547 next_work_item->dumpId,
4548 next_work_item->desc, next_work_item->tag);
4549
4550 /* Dispatch to some worker */
4553 }
4554 else if (IsEveryWorkerIdle(pstate))
4555 {
4556 /*
4557 * Nothing is ready and no worker is running, so we're done with
4558 * the current pass or maybe with the whole process.
4559 */
4560 if (AH->restorePass == RESTORE_PASS_LAST)
4561 break; /* No more parallel processing is possible */
4562
4563 /* Advance to next restore pass */
4564 AH->restorePass++;
4565 /* That probably allows some stuff to be made ready */
4567 /* Loop around to see if anything's now ready */
4568 continue;
4569 }
4570 else
4571 {
4572 /*
4573 * We have nothing ready, but at least one child is working, so
4574 * wait for some subjob to finish.
4575 */
4576 }
4577
4578 /*
4579 * Before dispatching another job, check to see if anything has
4580 * finished. We should check every time through the loop so as to
4581 * reduce dependencies as soon as possible. If we were unable to
4582 * dispatch any job this time through, wait until some worker finishes
4583 * (and, hopefully, unblocks some pending item). If we did dispatch
4584 * something, continue as soon as there's at least one idle worker.
4585 * Note that in either case, there's guaranteed to be at least one
4586 * idle worker when we return to the top of the loop. This ensures we
4587 * won't block inside DispatchJobForTocEntry, which would be
4588 * undesirable: we'd rather postpone dispatching until we see what's
4589 * been unblocked by finished jobs.
4590 */
4591 WaitForWorkers(AH, pstate,
4593 }
4594
4595 /* There should now be nothing in ready_heap. */
4597
4599
4600 pg_log_info("finished main parallel loop");
4601}
4602
4603/*
4604 * Main engine for parallel restore.
4605 *
4606 * Parallel restore is done in three phases. In this third phase,
4607 * we mop up any remaining TOC entries by processing them serially.
4608 * This phase normally should have nothing to do, but if we've somehow
4609 * gotten stuck due to circular dependencies or some such, this provides
4610 * at least some chance of completing the restore successfully.
4611 */
4612static void
4614{
4615 RestoreOptions *ropt = AH->public.ropt;
4616 TocEntry *te;
4617
4618 pg_log_debug("entering restore_toc_entries_postfork");
4619
4620 /*
4621 * Now reconnect the single parent connection.
4622 */
4623 ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4624
4625 /* re-establish fixed state */
4627
4628 /*
4629 * Make sure there is no work left due to, say, circular dependencies, or
4630 * some other pathological condition. If so, do it in the single parent
4631 * connection. We don't sweat about RestorePass ordering; it's likely we
4632 * already violated that.
4633 */
4634 for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4635 {
4636 pg_log_info("processing missed item %d %s %s",
4637 te->dumpId, te->desc, te->tag);
4638 (void) restore_toc_entry(AH, te, false);
4639 }
4640}
4641
4642/*
4643 * Check if te1 has an exclusive lock requirement for an item that te2 also
4644 * requires, whether or not te2's requirement is for an exclusive lock.
4645 */
4646static bool
4648{
4649 int j,
4650 k;
4651
4652 for (j = 0; j < te1->nLockDeps; j++)
4653 {
4654 for (k = 0; k < te2->nDeps; k++)
4655 {
4656 if (te1->lockDeps[j] == te2->dependencies[k])
4657 return true;
4658 }
4659 }
4660 return false;
4661}
4662
4663
4664/*
4665 * Initialize the header of the pending-items list.
4666 *
4667 * This is a circular list with a dummy TocEntry as header, just like the
4668 * main TOC list; but we use separate list links so that an entry can be in
4669 * the main TOC list as well as in the pending list.
4670 */
4671static void
4676
4677/* Append te to the end of the pending-list headed by l */
4678static void
4680{
4681 te->pending_prev = l->pending_prev;
4682 l->pending_prev->pending_next = te;
4683 l->pending_prev = te;
4684 te->pending_next = l;
4685}
4686
4687/* Remove te from the pending-list */
4688static void
4696
4697
4698/* qsort comparator for sorting TocEntries by dataLength */
4699static int
4700TocEntrySizeCompareQsort(const void *p1, const void *p2)
4701{
4702 const TocEntry *te1 = *(const TocEntry *const *) p1;
4703 const TocEntry *te2 = *(const TocEntry *const *) p2;
4704
4705 /* Sort by decreasing dataLength */
4706 if (te1->dataLength > te2->dataLength)
4707 return -1;
4708 if (te1->dataLength < te2->dataLength)
4709 return 1;
4710
4711 /* For equal dataLengths, sort by dumpId, just to be stable */
4712 if (te1->dumpId < te2->dumpId)
4713 return -1;
4714 if (te1->dumpId > te2->dumpId)
4715 return 1;
4716
4717 return 0;
4718}
4719
4720/* binaryheap comparator for sorting TocEntries by dataLength */
4721static int
4723{
4724 /* return opposite of qsort comparator for max-heap */
4725 return -TocEntrySizeCompareQsort(&p1, &p2);
4726}
4727
4728
4729/*
4730 * Move all immediately-ready items from pending_list to ready_heap.
4731 *
4732 * Items are considered ready if they have no remaining dependencies and
4733 * they belong in the current restore pass. (See also reduce_dependencies,
4734 * which applies the same logic one-at-a-time.)
4735 */
4736static void
4740{
4741 TocEntry *te;
4743
4744 for (te = pending_list->pending_next; te != pending_list; te = next_te)
4745 {
4746 /* must save list link before possibly removing te from list */
4747 next_te = te->pending_next;
4748
4749 if (te->depCount == 0 &&
4751 {
4752 /* Remove it from pending_list ... */
4754 /* ... and add to ready_heap */
4756 }
4757 }
4758}
4759
4760/*
4761 * Find the next work item (if any) that is capable of being run now,
4762 * and remove it from the ready_heap.
4763 *
4764 * Returns the item, or NULL if nothing is runnable.
4765 *
4766 * To qualify, the item must have no remaining dependencies
4767 * and no requirements for locks that are incompatible with
4768 * items currently running. Items in the ready_heap are known to have
4769 * no remaining dependencies, but we have to check for lock conflicts.
4770 */
4771static TocEntry *
4773 ParallelState *pstate)
4774{
4775 /*
4776 * Search the ready_heap until we find a suitable item. Note that we do a
4777 * sequential scan through the heap nodes, so even though we will first
4778 * try to choose the highest-priority item, we might end up picking
4779 * something with a much lower priority. However, we expect that we will
4780 * typically be able to pick one of the first few items, which should
4781 * usually have a relatively high priority.
4782 */
4783 for (int i = 0; i < binaryheap_size(ready_heap); i++)
4784 {
4786 bool conflicts = false;
4787
4788 /*
4789 * Check to see if the item would need exclusive lock on something
4790 * that a currently running item also needs lock on, or vice versa. If
4791 * so, we don't want to schedule them together.
4792 */
4793 for (int k = 0; k < pstate->numWorkers; k++)
4794 {
4795 TocEntry *running_te = pstate->te[k];
4796
4797 if (running_te == NULL)
4798 continue;
4799 if (has_lock_conflicts(te, running_te) ||
4801 {
4802 conflicts = true;
4803 break;
4804 }
4805 }
4806
4807 if (conflicts)
4808 continue;
4809
4810 /* passed all tests, so this item can run */
4812 return te;
4813 }
4814
4815 pg_log_debug("no item ready");
4816 return NULL;
4817}
4818
4819
4820/*
4821 * Restore a single TOC item in parallel with others
4822 *
4823 * this is run in the worker, i.e. in a thread (Windows) or a separate process
4824 * (everything else). A worker process executes several such work items during
4825 * a parallel backup or restore. Once we terminate here and report back that
4826 * our work is finished, the leader process will assign us a new work item.
4827 */
4828int
4830{
4831 int status;
4832
4833 Assert(AH->connection != NULL);
4834
4835 /* Count only errors associated with this TOC entry */
4836 AH->public.n_errors = 0;
4837
4838 /* Restore the TOC item */
4839 status = restore_toc_entry(AH, te, true);
4840
4841 return status;
4842}
4843
4844
4845/*
4846 * Callback function that's invoked in the leader process after a step has
4847 * been parallel restored.
4848 *
4849 * Update status and reduce the dependency count of any dependent items.
4850 */
4851static void
4853 TocEntry *te,
4854 int status,
4855 void *callback_data)
4856{
4857 binaryheap *ready_heap = (binaryheap *) callback_data;
4858
4859 pg_log_info("finished item %d %s %s",
4860 te->dumpId, te->desc, te->tag);
4861
4862 if (status == WORKER_CREATE_DONE)
4863 mark_create_done(AH, te);
4864 else if (status == WORKER_INHIBIT_DATA)
4865 {
4867 AH->public.n_errors++;
4868 }
4869 else if (status == WORKER_IGNORED_ERRORS)
4870 AH->public.n_errors++;
4871 else if (status != 0)
4872 pg_fatal("worker process failed: exit code %d",
4873 status);
4874
4876}
4877
4878
4879/*
4880 * Process the dependency information into a form useful for parallel restore.
4881 *
4882 * This function takes care of fixing up some missing or badly designed
4883 * dependencies, and then prepares subsidiary data structures that will be
4884 * used in the main parallel-restore logic, including:
4885 * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4886 * 2. We set up depCount fields that are the number of as-yet-unprocessed
4887 * dependencies for each TOC entry.
4888 *
4889 * We also identify locking dependencies so that we can avoid trying to
4890 * schedule conflicting items at the same time.
4891 */
4892static void
4894{
4895 TocEntry *te;
4896 int i;
4897
4898 /*
4899 * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4900 * items are marked as not being in any parallel-processing list.
4901 */
4902 for (te = AH->toc->next; te != AH->toc; te = te->next)
4903 {
4904 te->depCount = te->nDeps;
4905 te->revDeps = NULL;
4906 te->nRevDeps = 0;
4907 te->pending_prev = NULL;
4908 te->pending_next = NULL;
4909 }
4910
4911 /*
4912 * POST_DATA items that are shown as depending on a table need to be
4913 * re-pointed to depend on that table's data, instead. This ensures they
4914 * won't get scheduled until the data has been loaded.
4915 */
4917
4918 /*
4919 * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4920 * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4921 * one BLOB COMMENTS in such files.)
4922 */
4923 if (AH->version < K_VERS_1_11)
4924 {
4925 for (te = AH->toc->next; te != AH->toc; te = te->next)
4926 {
4927 if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4928 {
4929 TocEntry *te2;
4930
4931 for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4932 {
4933 if (strcmp(te2->desc, "BLOBS") == 0)
4934 {
4936 te->dependencies[0] = te2->dumpId;
4937 te->nDeps++;
4938 te->depCount++;
4939 break;
4940 }
4941 }
4942 break;
4943 }
4944 }
4945 }
4946
4947 /*
4948 * At this point we start to build the revDeps reverse-dependency arrays,
4949 * so all changes of dependencies must be complete.
4950 */
4951
4952 /*
4953 * Count the incoming dependencies for each item. Also, it is possible
4954 * that the dependencies list items that are not in the archive at all
4955 * (that should not happen in 9.2 and later, but is highly likely in older
4956 * archives). Subtract such items from the depCounts.
4957 */
4958 for (te = AH->toc->next; te != AH->toc; te = te->next)
4959 {
4960 for (i = 0; i < te->nDeps; i++)
4961 {
4962 DumpId depid = te->dependencies[i];
4963
4964 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4965 AH->tocsByDumpId[depid]->nRevDeps++;
4966 else
4967 te->depCount--;
4968 }
4969 }
4970
4971 /*
4972 * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4973 * it as a counter below.
4974 */
4975 for (te = AH->toc->next; te != AH->toc; te = te->next)
4976 {
4977 if (te->nRevDeps > 0)
4979 te->nRevDeps = 0;
4980 }
4981
4982 /*
4983 * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4984 * better agree with the loops above.
4985 */
4986 for (te = AH->toc->next; te != AH->toc; te = te->next)
4987 {
4988 for (i = 0; i < te->nDeps; i++)
4989 {
4990 DumpId depid = te->dependencies[i];
4991
4992 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4993 {
4995
4996 otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4997 }
4998 }
4999 }
5000
5001 /*
5002 * Lastly, work out the locking dependencies.
5003 */
5004 for (te = AH->toc->next; te != AH->toc; te = te->next)
5005 {
5006 te->lockDeps = NULL;
5007 te->nLockDeps = 0;
5009 }
5010}
5011
5012/*
5013 * Change dependencies on table items to depend on table data items instead,
5014 * but only in POST_DATA items.
5015 *
5016 * Also, for any item having such dependency(s), set its dataLength to the
5017 * largest dataLength of the table data items it depends on. This ensures
5018 * that parallel restore will prioritize larger jobs (index builds, FK
5019 * constraint checks, etc) over smaller ones, avoiding situations where we
5020 * end a restore with only one active job working on a large table.
5021 */
5022static void
5024{
5025 TocEntry *te;
5026 int i;
5027 DumpId olddep;
5028
5029 for (te = AH->toc->next; te != AH->toc; te = te->next)
5030 {
5031 if (te->section != SECTION_POST_DATA)
5032 continue;
5033 for (i = 0; i < te->nDeps; i++)
5034 {
5035 olddep = te->dependencies[i];
5036 if (olddep <= AH->maxDumpId &&
5037 AH->tableDataId[olddep] != 0)
5038 {
5041
5042 te->dependencies[i] = tabledataid;
5043 te->dataLength = Max(te->dataLength, tabledatate->dataLength);
5044 pg_log_debug("transferring dependency %d -> %d to %d",
5045 te->dumpId, olddep, tabledataid);
5046 }
5047 }
5048 }
5049}
5050
5051/*
5052 * Identify which objects we'll need exclusive lock on in order to restore
5053 * the given TOC entry (*other* than the one identified by the TOC entry
5054 * itself). Record their dump IDs in the entry's lockDeps[] array.
5055 */
5056static void
5058{
5059 DumpId *lockids;
5060 int nlockids;
5061 int i;
5062
5063 /*
5064 * We only care about this for POST_DATA items. PRE_DATA items are not
5065 * run in parallel, and DATA items are all independent by assumption.
5066 */
5067 if (te->section != SECTION_POST_DATA)
5068 return;
5069
5070 /* Quick exit if no dependencies at all */
5071 if (te->nDeps == 0)
5072 return;
5073
5074 /*
5075 * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
5076 * and hence require exclusive lock. However, we know that CREATE INDEX
5077 * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
5078 * category too ... but today is not that day.)
5079 */
5080 if (strcmp(te->desc, "INDEX") == 0)
5081 return;
5082
5083 /*
5084 * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5085 * item listed among its dependencies. Originally all of these would have
5086 * been TABLE items, but repoint_table_dependencies would have repointed
5087 * them to the TABLE DATA items if those are present (which they might not
5088 * be, eg in a schema-only dump). Note that all of the entries we are
5089 * processing here are POST_DATA; otherwise there might be a significant
5090 * difference between a dependency on a table and a dependency on its
5091 * data, so that closer analysis would be needed here.
5092 */
5094 nlockids = 0;
5095 for (i = 0; i < te->nDeps; i++)
5096 {
5097 DumpId depid = te->dependencies[i];
5098
5099 if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5100 ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5101 strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5102 lockids[nlockids++] = depid;
5103 }
5104
5105 if (nlockids == 0)
5106 {
5107 free(lockids);
5108 return;
5109 }
5110
5112 te->nLockDeps = nlockids;
5113}
5114
5115/*
5116 * Remove the specified TOC entry from the depCounts of items that depend on
5117 * it, thereby possibly making them ready-to-run. Any pending item that
5118 * becomes ready should be moved to the ready_heap, if that's provided.
5119 */
5120static void
5123{
5124 int i;
5125
5126 pg_log_debug("reducing dependencies for %d", te->dumpId);
5127
5128 for (i = 0; i < te->nRevDeps; i++)
5129 {
5130 TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5131
5132 Assert(otherte->depCount > 0);
5133 otherte->depCount--;
5134
5135 /*
5136 * It's ready if it has no remaining dependencies, and it belongs in
5137 * the current restore pass, and it is currently a member of the
5138 * pending list (that check is needed to prevent double restore in
5139 * some cases where a list-file forces out-of-order restoring).
5140 * However, if ready_heap == NULL then caller doesn't want any list
5141 * memberships changed.
5142 */
5143 if (otherte->depCount == 0 &&
5145 otherte->pending_prev != NULL &&
5146 ready_heap != NULL)
5147 {
5148 /* Remove it from pending list ... */
5150 /* ... and add to ready_heap */
5152 }
5153 }
5154}
5155
5156/*
5157 * Set the created flag on the DATA member corresponding to the given
5158 * TABLE member
5159 */
5160static void
5162{
5163 if (AH->tableDataId[te->dumpId] != 0)
5164 {
5165 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5166
5167 ted->created = true;
5168 }
5169}
5170
5171/*
5172 * Mark the DATA member corresponding to the given TABLE member
5173 * as not wanted
5174 */
5175static void
5177{
5178 pg_log_info("table \"%s\" could not be created, will not restore its data",
5179 te->tag);
5180
5181 if (AH->tableDataId[te->dumpId] != 0)
5182 {
5183 TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5184
5185 ted->reqs = 0;
5186 }
5187}
5188
5189/*
5190 * Clone and de-clone routines used in parallel restoration.
5191 *
5192 * Enough of the structure is cloned to ensure that there is no
5193 * conflict between different threads each with their own clone.
5194 */
5197{
5199
5200 /* Make a "flat" copy */
5202 memcpy(clone, AH, sizeof(ArchiveHandle));
5203
5204 /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5205 clone->public.ropt = pg_malloc_object(RestoreOptions);
5206 memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5207
5208 /* Handle format-independent fields */
5209 memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5210
5211 /* The clone will have its own connection, so disregard connection state */
5212 clone->connection = NULL;
5213 clone->connCancel = NULL;
5214 clone->currUser = NULL;
5215 clone->currSchema = NULL;
5216 clone->currTableAm = NULL;
5217 clone->currTablespace = NULL;
5218
5219 /* savedPassword must be local in case we change it while connecting */
5220 if (clone->savedPassword)
5221 clone->savedPassword = pg_strdup(clone->savedPassword);
5222
5223 /* clone has its own error count, too */
5224 clone->public.n_errors = 0;
5225
5226 /* clones should not share lo_buf */
5227 clone->lo_buf = NULL;
5228
5229 /*
5230 * Clone connections disregard --transaction-size; they must commit after
5231 * each command so that the results are immediately visible to other
5232 * workers.
5233 */
5234 clone->public.ropt->txn_size = 0;
5235
5236 /*
5237 * Connect our new clone object to the database, using the same connection
5238 * parameters used for the original connection.
5239 */
5240 ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5241
5242 /* re-establish fixed state */
5243 if (AH->mode == archModeRead)
5245 /* in write case, setupDumpWorker will fix up connection state */
5246
5247 /* Let the format-specific code have a chance too */
5248 clone->ClonePtr(clone);
5249
5250 Assert(clone->connection != NULL);
5251 return clone;
5252}
5253
5254/*
5255 * Release clone-local storage.
5256 *
5257 * Note: we assume any clone-local connection was already closed.
5258 */
5259void
5261{
5262 /* Should not have an open database connection */
5263 Assert(AH->connection == NULL);
5264
5265 /* Clear format-specific state */
5266 AH->DeClonePtr(AH);
5267
5268 /* Clear state allocated by CloneArchive */
5269 if (AH->sqlparse.curCmd)
5271
5272 /* Clear any connection-local state */
5273 free(AH->currUser);
5274 free(AH->currSchema);
5275 free(AH->currTablespace);
5276 free(AH->currTableAm);
5277 free(AH->savedPassword);
5278
5279 free(AH);
5280}
int lo_write(int fd, const char *buf, int len)
Definition be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition parallel.c:1075
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition parallel.c:1467
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition parallel.c:913
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition parallel.c:1221
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition parallel.c:1284
@ WFW_ALL_IDLE
Definition parallel.h:35
@ WFW_GOT_STATUS
Definition parallel.h:33
@ WFW_ONE_IDLE
Definition parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition binaryheap.c:223
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition binaryheap.c:152
void binaryheap_free(binaryheap *heap)
Definition binaryheap.c:73
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition binaryheap.c:37
#define binaryheap_size(h)
Definition binaryheap.h:66
#define binaryheap_empty(h)
Definition binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition binaryheap.h:67
#define PG_BINARY_R
Definition c.h:1376
#define ngettext(s, p, n)
Definition c.h:1270
#define PG_BINARY_A
Definition c.h:1375
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
#define PG_BINARY_W
Definition c.h:1377
uint32 result
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool EndCompressFileHandle(CompressFileHandle *CFH)
char * supports_compression(const pg_compress_specification compression_spec)
Definition compress_io.c:87
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition compression.c:99
@ PG_COMPRESSION_GZIP
Definition compression.h:24
@ PG_COMPRESSION_NONE
Definition compression.h:23
char * sanitize_line(const char *str, bool want_hyphen)
Definition dumputils.c:52
#define PGDUMP_STRFTIME_FMT
Definition dumputils.h:34
Datum arg
Definition elog.c:1323
char * PQdb(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int lo_close(PGconn *conn, int fd)
Definition fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition fe-lobj.c:474
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition fe_memutils.c:53
void pg_free(void *ptr)
#define pg_realloc_array(pointer, type, count)
Definition fe_memutils.h:63
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
#define pg_malloc0_object(type)
Definition fe_memutils.h:51
#define pg_malloc_object(type)
Definition fe_memutils.h:50
#define pg_malloc0_array(type, count)
Definition fe_memutils.h:57
DataDirSyncMethod
Definition file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition file_utils.h:29
int remaining
Definition informix.c:692
char sign
Definition informix.c:693
static char * encoding
Definition initdb.c:139
static DataDirSyncMethod sync_method
Definition initdb.c:170
int b
Definition isn.c:74
return true
Definition isn.c:130
int j
Definition isn.c:78
int i
Definition isn.c:77
#define PQclear
#define PQresultStatus
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
#define INV_WRITE
Definition libpq-fs.h:21
static struct pg_tm tm
Definition localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition logging.c:233
#define pg_log_info(...)
Definition logging.h:126
@ PG_LOG_PRIMARY
Definition logging.h:67
@ PG_LOG_ERROR
Definition logging.h:43
#define pg_log_debug(...)
Definition logging.h:135
static size_t append_data(char *buf, size_t size, size_t nmemb, void *userdata)
static char * errmsg
static AmcheckOptions opts
Definition pg_amcheck.c:112
@ SECTION_NONE
Definition pg_backup.h:57
@ SECTION_POST_DATA
Definition pg_backup.h:60
@ SECTION_PRE_DATA
Definition pg_backup.h:58
@ SECTION_DATA
Definition pg_backup.h:59
int DumpId
Definition pg_backup.h:285
void(* SetupWorkerPtrType)(Archive *AH)
Definition pg_backup.h:292
enum _archiveFormat ArchiveFormat
void ConnectDatabaseAhx(Archive *AHX, const ConnParams *cparams, bool isReconnect)
@ archModeWrite
Definition pg_backup.h:51
@ archModeAppend
Definition pg_backup.h:50
@ archModeRead
Definition pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
enum _teSection teSection
@ archUnknown
Definition pg_backup.h:41
@ archTar
Definition pg_backup.h:43
@ archCustom
Definition pg_backup.h:42
@ archDirectory
Definition pg_backup.h:45
@ archNull
Definition pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
#define TOC_PREFIX_DATA
static void _becomeUser(ArchiveHandle *AH, const char *user)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
RestoreOptions * NewRestoreOptions(void)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
char * ReadStr(ArchiveHandle *AH)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static RestorePass _tocEntryRestorePass(TocEntry *te)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
int StartLO(Archive *AHX, Oid oid)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
#define TOC_PREFIX_STATS
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
static void setupRestoreWorker(Archive *AHX)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void StartRestoreLOs(ArchiveHandle *AH)
void RestoreArchive(Archive *AHX, bool append_data)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
DumpOptions * NewDumpOptions(void)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
#define TOC_PREFIX_NONE
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec, bool append_data)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define LOBBUFSIZE
void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
#define ARCHIVE_REV(version)
#define REQ_STATS
#define WRITE_ERROR_EXIT
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, const char *cmdBegin, const char *cmdEnd)
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
#define K_VERS_1_16
@ RESTORE_PASS_POST_ACL
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
void exit_nicely(int code)
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition pg_ctl.c:81
static bool dosync
Definition pg_dump.c:152
static void setupDumpWorker(Archive *AH)
Definition pg_dump.c:1580
static char * filename
Definition pg_dumpall.c:133
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition pg_get_line.c:95
static char * user
Definition pg_regress.c:121
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define pg_encoding_to_char
Definition pg_wchar.h:483
#define pg_char_to_encoding
Definition pg_wchar.h:482
static char * tablespace
Definition pgbench.c:217
#define pg_log_warning(...)
Definition pgfnames.c:24
bool isValidTarHeader(const char *header)
Definition tar.c:112
#define sprintf
Definition port.h:262
#define snprintf
Definition port.h:260
#define qsort(a, b, c, d)
Definition port.h:495
off_t pgoff_t
Definition port.h:421
#define InvalidOid
unsigned int Oid
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void termPQExpBuffer(PQExpBuffer str)
char * c
static int fb(int x)
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition psprintf.c:103
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
const char * simple_string_list_not_touched(SimpleStringList *list)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
#define free(a)
char * dbname
Definition streamutil.c:49
const char * fmtQualifiedId(const char *schema, const char *id)
const char * fmtId(const char *rawid)
void setFmtEncoding(int encoding)
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
int minRemoteVersion
Definition pg_backup.h:237
char * remoteVersionStr
Definition pg_backup.h:233
DumpOptions * dopt
Definition pg_backup.h:229
bool exit_on_error
Definition pg_backup.h:252
char * searchpath
Definition pg_backup.h:248
int maxRemoteVersion
Definition pg_backup.h:238
int n_errors
Definition pg_backup.h:253
bool std_strings
Definition pg_backup.h:245
int numWorkers
Definition pg_backup.h:240
int encoding
Definition pg_backup.h:244
int verbose
Definition pg_backup.h:232
RestoreOptions * ropt
Definition pg_backup.h:230
Oid tableoid
Definition pg_backup.h:281
TocEntry ** te
Definition parallel.h:59
SimpleStringListCell * head
Definition simple_list.h:42
ArchiverStage stage
RestorePass restorePass
ArchiveFormat format
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
DataDirSyncMethod sync_method
struct _tocEntry * lastErrorTE
ReadExtraTocPtrType ReadExtraTocPtr
struct _tocEntry * currentTE
CustomOutPtrType CustomOutPtr
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
struct _tocEntry ** tocsByDumpId
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
PrintTocDataPtrType PrintTocDataPtr
struct _tocEntry * currToc
WriteBytePtrType WriteBytePtr
sqlparseInfo sqlparse
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
ArchiverStage lastErrorStage
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
ArchiverOutput outputKind
EndDataPtrType EndDataPtr
SetupWorkerPtrType SetupWorkerPtr
ClosePtrType ClosePtr
char * pgport
Definition pg_backup.h:88
char * pghost
Definition pg_backup.h:89
trivalue promptPassword
Definition pg_backup.h:91
char * username
Definition pg_backup.h:90
char * dbname
Definition pg_backup.h:87
char * restrict_key
Definition pg_backup.h:220
int column_inserts
Definition pg_backup.h:185
int use_setsessauth
Definition pg_backup.h:198
int outputCreateDB
Definition pg_backup.h:206
bool include_everything
Definition pg_backup.h:203
int sequence_data
Definition pg_backup.h:212
int disable_dollar_quoting
Definition pg_backup.h:184
bool dumpSchema
Definition pg_backup.h:216
int outputNoTableAm
Definition pg_backup.h:196
int enable_row_security
Definition pg_backup.h:199
char * outputSuperuser
Definition pg_backup.h:210
int no_security_labels
Definition pg_backup.h:190
bool dumpStatistics
Definition pg_backup.h:218
int no_publications
Definition pg_backup.h:189
ConnParams cparams
Definition pg_backup.h:173
const char * lockWaitTimeout
Definition pg_backup.h:180
int no_subscriptions
Definition pg_backup.h:191
int outputNoTablespaces
Definition pg_backup.h:197
int disable_triggers
Definition pg_backup.h:195
int outputNoOwner
Definition pg_backup.h:209
SimpleStringList schemaExcludeNames
Definition pg_backup.h:141
int suppressDumpWarnings
Definition pg_backup.h:152
ConnParams cparams
Definition pg_backup.h:146
SimpleStringList functionNames
Definition pg_backup.h:139
SimpleStringList tableNames
Definition pg_backup.h:143
SimpleStringList indexNames
Definition pg_backup.h:138
pg_compress_specification compression_spec
Definition pg_backup.h:150
SimpleStringList triggerNames
Definition pg_backup.h:142
int disable_dollar_quoting
Definition pg_backup.h:110
SimpleStringList schemaNames
Definition pg_backup.h:140
char * restrict_key
Definition pg_backup.h:168
const char * filename
Definition pg_backup.h:121
const char * lockWaitTimeout
Definition pg_backup.h:125
int enable_row_security
Definition pg_backup.h:159
int noDataForFailedTables
Definition pg_backup.h:148
struct _tocEntry * pending_next
struct _tocEntry * prev
teSection section
struct _tocEntry * pending_prev
DefnDumperPtr defnDumper
DataDumperPtr dataDumper
CatalogId catalogId
struct _tocEntry * next
const void * dataDumperArg
const void * defnDumperArg
DumpId * dependencies
pg_compress_algorithm algorithm
Definition compression.h:34
int tm_sec
Definition pgtime.h:36
unsigned short st_mode
Definition win32_port.h:258
static void * fn(void *arg)
@ TRI_DEFAULT
Definition vacuumlo.c:36
const char * type
#define stat
Definition win32_port.h:74
#define S_ISDIR(m)
Definition win32_port.h:315
#define ftello(stream)
Definition win32_port.h:209
#define S_ISREG(m)
Definition win32_port.h:318
#define fseeko(stream, offset, origin)
Definition win32_port.h:206
static void StartTransaction(void)
Definition xact.c:2106
static void CommitTransaction(void)
Definition xact.c:2270
ArchiveMode
Definition xlog.h:66