PostgreSQL Source Code  git master
pg_backup_archiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  * Private implementation of the archiver routines.
6  *
7  * See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  * Rights are granted to use this software in any way so long
11  * as this notice is not removed.
12  *
13  * The author is not responsible for loss or damages that may
14  * result from its use.
15  *
16  *
17  * IDENTIFICATION
18  * src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "catalog/pg_class_d.h"
34 #include "common/string.h"
35 #include "compress_io.h"
36 #include "dumputils.h"
37 #include "fe_utils/string_utils.h"
38 #include "lib/binaryheap.h"
39 #include "lib/stringinfo.h"
40 #include "libpq/libpq-fs.h"
41 #include "parallel.h"
42 #include "pg_backup_archiver.h"
43 #include "pg_backup_db.h"
44 #include "pg_backup_utils.h"
45 
46 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 
49 
50 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
51  const pg_compress_specification compression_spec,
52  bool dosync, ArchiveMode mode,
53  SetupWorkerPtrType setupWorkerPtr,
55 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
56 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
57 static char *sanitize_line(const char *str, bool want_hyphen);
58 static void _doSetFixedOutputState(ArchiveHandle *AH);
59 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
60 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
61 static void _becomeUser(ArchiveHandle *AH, const char *user);
62 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
63 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
64 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
65 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
67  TocEntry *te);
68 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
69 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
70 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
71 static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
73 static bool _tocEntryIsACL(TocEntry *te);
76 static bool is_load_via_partition_root(TocEntry *te);
77 static void buildTocEntryArrays(ArchiveHandle *AH);
78 static void _moveBefore(TocEntry *pos, TocEntry *te);
80 
81 static int RestoringToDB(ArchiveHandle *AH);
82 static void dump_lo_buf(ArchiveHandle *AH);
83 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
84 static void SetOutput(ArchiveHandle *AH, const char *filename,
85  const pg_compress_specification compression_spec);
87 static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
88 
89 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
91  TocEntry *pending_list);
93  ParallelState *pstate,
94  TocEntry *pending_list);
96  TocEntry *pending_list);
97 static void pending_list_header_init(TocEntry *l);
98 static void pending_list_append(TocEntry *l, TocEntry *te);
99 static void pending_list_remove(TocEntry *te);
100 static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
101 static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
102 static void move_to_ready_heap(TocEntry *pending_list,
103  binaryheap *ready_heap,
104  RestorePass pass);
105 static TocEntry *pop_next_work_item(binaryheap *ready_heap,
106  ParallelState *pstate);
107 static void mark_dump_job_done(ArchiveHandle *AH,
108  TocEntry *te,
109  int status,
110  void *callback_data);
111 static void mark_restore_job_done(ArchiveHandle *AH,
112  TocEntry *te,
113  int status,
114  void *callback_data);
115 static void fix_dependencies(ArchiveHandle *AH);
116 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
119 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
120  binaryheap *ready_heap);
121 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
123 
124 static void StrictNamesCheck(RestoreOptions *ropt);
125 
126 
127 /*
128  * Allocate a new DumpOptions block containing all default values.
129  */
130 DumpOptions *
132 {
134 
136  return opts;
137 }
138 
139 /*
140  * Initialize a DumpOptions struct to all default values
141  */
142 void
144 {
145  memset(opts, 0, sizeof(DumpOptions));
146  /* set any fields that shouldn't default to zeroes */
147  opts->include_everything = true;
148  opts->cparams.promptPassword = TRI_DEFAULT;
149  opts->dumpSections = DUMP_UNSECTIONED;
150 }
151 
152 /*
153  * Create a freshly allocated DumpOptions with options equivalent to those
154  * found in the given RestoreOptions.
155  */
156 DumpOptions *
158 {
159  DumpOptions *dopt = NewDumpOptions();
160 
161  /* this is the inverse of what's at the end of pg_dump.c's main() */
162  dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
163  dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
164  dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
165  dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
167  dopt->outputClean = ropt->dropSchema;
168  dopt->dataOnly = ropt->dataOnly;
169  dopt->schemaOnly = ropt->schemaOnly;
170  dopt->if_exists = ropt->if_exists;
171  dopt->column_inserts = ropt->column_inserts;
172  dopt->dumpSections = ropt->dumpSections;
173  dopt->aclsSkip = ropt->aclsSkip;
174  dopt->outputSuperuser = ropt->superuser;
175  dopt->outputCreateDB = ropt->createDB;
176  dopt->outputNoOwner = ropt->noOwner;
177  dopt->outputNoTableAm = ropt->noTableAm;
178  dopt->outputNoTablespaces = ropt->noTablespace;
179  dopt->disable_triggers = ropt->disable_triggers;
180  dopt->use_setsessauth = ropt->use_setsessauth;
182  dopt->dump_inserts = ropt->dump_inserts;
183  dopt->no_comments = ropt->no_comments;
184  dopt->no_publications = ropt->no_publications;
186  dopt->no_subscriptions = ropt->no_subscriptions;
187  dopt->lockWaitTimeout = ropt->lockWaitTimeout;
190  dopt->sequence_data = ropt->sequence_data;
191 
192  return dopt;
193 }
194 
195 
196 /*
197  * Wrapper functions.
198  *
199  * The objective is to make writing new formats and dumpers as simple
200  * as possible, if necessary at the expense of extra function calls etc.
201  *
202  */
203 
204 /*
205  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
206  * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
207  * setup doesn't need to know anything much, so it's defined here.
208  */
209 static void
211 {
212  ArchiveHandle *AH = (ArchiveHandle *) AHX;
213 
214  AH->ReopenPtr(AH);
215 }
216 
217 
218 /* Create a new archive */
219 /* Public */
220 Archive *
221 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
222  const pg_compress_specification compression_spec,
223  bool dosync, ArchiveMode mode,
226 
227 {
228  ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
230 
231  return (Archive *) AH;
232 }
233 
234 /* Open an existing archive */
235 /* Public */
236 Archive *
237 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
238 {
239  ArchiveHandle *AH;
240  pg_compress_specification compression_spec = {0};
241 
242  compression_spec.algorithm = PG_COMPRESSION_NONE;
243  AH = _allocAH(FileSpec, fmt, compression_spec, true,
246 
247  return (Archive *) AH;
248 }
249 
250 /* Public */
251 void
253 {
254  ArchiveHandle *AH = (ArchiveHandle *) AHX;
255 
256  AH->ClosePtr(AH);
257 
258  /* Close the output */
259  errno = 0;
260  if (!EndCompressFileHandle(AH->OF))
261  pg_fatal("could not close output file: %m");
262 }
263 
264 /* Public */
265 void
267 {
268  /* Caller can omit dump options, in which case we synthesize them */
269  if (dopt == NULL && ropt != NULL)
270  dopt = dumpOptionsFromRestoreOptions(ropt);
271 
272  /* Save options for later access */
273  AH->dopt = dopt;
274  AH->ropt = ropt;
275 }
276 
277 /* Public */
278 void
280 {
281  ArchiveHandle *AH = (ArchiveHandle *) AHX;
282  RestoreOptions *ropt = AH->public.ropt;
283  TocEntry *te;
284  teSection curSection;
285 
286  /* Decide which TOC entries will be dumped/restored, and mark them */
287  curSection = SECTION_PRE_DATA;
288  for (te = AH->toc->next; te != AH->toc; te = te->next)
289  {
290  /*
291  * When writing an archive, we also take this opportunity to check
292  * that we have generated the entries in a sane order that respects
293  * the section divisions. When reading, don't complain, since buggy
294  * old versions of pg_dump might generate out-of-order archives.
295  */
296  if (AH->mode != archModeRead)
297  {
298  switch (te->section)
299  {
300  case SECTION_NONE:
301  /* ok to be anywhere */
302  break;
303  case SECTION_PRE_DATA:
304  if (curSection != SECTION_PRE_DATA)
305  pg_log_warning("archive items not in correct section order");
306  break;
307  case SECTION_DATA:
308  if (curSection == SECTION_POST_DATA)
309  pg_log_warning("archive items not in correct section order");
310  break;
311  case SECTION_POST_DATA:
312  /* ok no matter which section we were in */
313  break;
314  default:
315  pg_fatal("unexpected section code %d",
316  (int) te->section);
317  break;
318  }
319  }
320 
321  if (te->section != SECTION_NONE)
322  curSection = te->section;
323 
324  te->reqs = _tocEntryRequired(te, curSection, AH);
325  }
326 
327  /* Enforce strict names checking */
328  if (ropt->strict_names)
329  StrictNamesCheck(ropt);
330 }
331 
332 /* Public */
333 void
335 {
336  ArchiveHandle *AH = (ArchiveHandle *) AHX;
337  RestoreOptions *ropt = AH->public.ropt;
338  bool parallel_mode;
339  TocEntry *te;
340  CompressFileHandle *sav;
341 
343 
344  /*
345  * If we're going to do parallel restore, there are some restrictions.
346  */
347  parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
348  if (parallel_mode)
349  {
350  /* We haven't got round to making this work for all archive formats */
351  if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
352  pg_fatal("parallel restore is not supported with this archive file format");
353 
354  /* Doesn't work if the archive represents dependencies as OIDs */
355  if (AH->version < K_VERS_1_8)
356  pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
357 
358  /*
359  * It's also not gonna work if we can't reopen the input file, so
360  * let's try that immediately.
361  */
362  AH->ReopenPtr(AH);
363  }
364 
365  /*
366  * Make sure we won't need (de)compression we haven't got
367  */
368  if (AH->PrintTocDataPtr != NULL)
369  {
370  for (te = AH->toc->next; te != AH->toc; te = te->next)
371  {
372  if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
373  {
375 
376  if (errmsg)
377  pg_fatal("cannot restore from compressed archive (%s)",
378  errmsg);
379  else
380  break;
381  }
382  }
383  }
384 
385  /*
386  * Prepare index arrays, so we can assume we have them throughout restore.
387  * It's possible we already did this, though.
388  */
389  if (AH->tocsByDumpId == NULL)
391 
392  /*
393  * If we're using a DB connection, then connect it.
394  */
395  if (ropt->useDB)
396  {
397  pg_log_info("connecting to database for restore");
398  if (AH->version < K_VERS_1_3)
399  pg_fatal("direct database connections are not supported in pre-1.3 archives");
400 
401  /*
402  * We don't want to guess at whether the dump will successfully
403  * restore; allow the attempt regardless of the version of the restore
404  * target.
405  */
406  AHX->minRemoteVersion = 0;
407  AHX->maxRemoteVersion = 9999999;
408 
409  ConnectDatabase(AHX, &ropt->cparams, false);
410 
411  /*
412  * If we're talking to the DB directly, don't send comments since they
413  * obscure SQL when displaying errors
414  */
415  AH->noTocComments = 1;
416  }
417 
418  /*
419  * Work out if we have an implied data-only restore. This can happen if
420  * the dump was data only or if the user has used a toc list to exclude
421  * all of the schema data. All we do is look for schema entries - if none
422  * are found then we set the dataOnly flag.
423  *
424  * We could scan for wanted TABLE entries, but that is not the same as
425  * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
426  */
427  if (!ropt->dataOnly)
428  {
429  int impliedDataOnly = 1;
430 
431  for (te = AH->toc->next; te != AH->toc; te = te->next)
432  {
433  if ((te->reqs & REQ_SCHEMA) != 0)
434  { /* It's schema, and it's wanted */
435  impliedDataOnly = 0;
436  break;
437  }
438  }
439  if (impliedDataOnly)
440  {
441  ropt->dataOnly = impliedDataOnly;
442  pg_log_info("implied data-only restore");
443  }
444  }
445 
446  /*
447  * Setup the output file if necessary.
448  */
449  sav = SaveOutput(AH);
451  SetOutput(AH, ropt->filename, ropt->compression_spec);
452 
453  ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
454 
455  if (AH->archiveRemoteVersion)
456  ahprintf(AH, "-- Dumped from database version %s\n",
458  if (AH->archiveDumpVersion)
459  ahprintf(AH, "-- Dumped by pg_dump version %s\n",
460  AH->archiveDumpVersion);
461 
462  ahprintf(AH, "\n");
463 
464  if (AH->public.verbose)
465  dumpTimestamp(AH, "Started on", AH->createDate);
466 
467  if (ropt->single_txn)
468  {
469  if (AH->connection)
470  StartTransaction(AHX);
471  else
472  ahprintf(AH, "BEGIN;\n\n");
473  }
474 
475  /*
476  * Establish important parameter values right away.
477  */
479 
480  AH->stage = STAGE_PROCESSING;
481 
482  /*
483  * Drop the items at the start, in reverse order
484  */
485  if (ropt->dropSchema)
486  {
487  for (te = AH->toc->prev; te != AH->toc; te = te->prev)
488  {
489  AH->currentTE = te;
490 
491  /*
492  * In createDB mode, issue a DROP *only* for the database as a
493  * whole. Issuing drops against anything else would be wrong,
494  * because at this point we're connected to the wrong database.
495  * (The DATABASE PROPERTIES entry, if any, should be treated like
496  * the DATABASE entry.)
497  */
498  if (ropt->createDB)
499  {
500  if (strcmp(te->desc, "DATABASE") != 0 &&
501  strcmp(te->desc, "DATABASE PROPERTIES") != 0)
502  continue;
503  }
504 
505  /* Otherwise, drop anything that's selected and has a dropStmt */
506  if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
507  {
508  bool not_allowed_in_txn = false;
509 
510  pg_log_info("dropping %s %s", te->desc, te->tag);
511 
512  /*
513  * In --transaction-size mode, we have to temporarily exit our
514  * transaction block to drop objects that can't be dropped
515  * within a transaction.
516  */
517  if (ropt->txn_size > 0)
518  {
519  if (strcmp(te->desc, "DATABASE") == 0 ||
520  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
521  {
522  not_allowed_in_txn = true;
523  if (AH->connection)
524  CommitTransaction(AHX);
525  else
526  ahprintf(AH, "COMMIT;\n");
527  }
528  }
529 
530  /* Select owner and schema as necessary */
531  _becomeOwner(AH, te);
532  _selectOutputSchema(AH, te->namespace);
533 
534  /*
535  * Now emit the DROP command, if the object has one. Note we
536  * don't necessarily emit it verbatim; at this point we add an
537  * appropriate IF EXISTS clause, if the user requested it.
538  */
539  if (strcmp(te->desc, "BLOB METADATA") == 0)
540  {
541  /* We must generate the per-blob commands */
542  if (ropt->if_exists)
543  IssueCommandPerBlob(AH, te,
544  "SELECT pg_catalog.lo_unlink(oid) "
545  "FROM pg_catalog.pg_largeobject_metadata "
546  "WHERE oid = '", "'");
547  else
548  IssueCommandPerBlob(AH, te,
549  "SELECT pg_catalog.lo_unlink('",
550  "')");
551  }
552  else if (*te->dropStmt != '\0')
553  {
554  if (!ropt->if_exists ||
555  strncmp(te->dropStmt, "--", 2) == 0)
556  {
557  /*
558  * Without --if-exists, or if it's just a comment (as
559  * happens for the public schema), print the dropStmt
560  * as-is.
561  */
562  ahprintf(AH, "%s", te->dropStmt);
563  }
564  else
565  {
566  /*
567  * Inject an appropriate spelling of "if exists". For
568  * old-style large objects, we have a routine that
569  * knows how to do it, without depending on
570  * te->dropStmt; use that. For other objects we need
571  * to parse the command.
572  */
573  if (strcmp(te->desc, "BLOB") == 0)
574  {
575  DropLOIfExists(AH, te->catalogId.oid);
576  }
577  else
578  {
579  char *dropStmt = pg_strdup(te->dropStmt);
580  char *dropStmtOrig = dropStmt;
581  PQExpBuffer ftStmt = createPQExpBuffer();
582 
583  /*
584  * Need to inject IF EXISTS clause after ALTER
585  * TABLE part in ALTER TABLE .. DROP statement
586  */
587  if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
588  {
589  appendPQExpBufferStr(ftStmt,
590  "ALTER TABLE IF EXISTS");
591  dropStmt = dropStmt + 11;
592  }
593 
594  /*
595  * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
596  * not support the IF EXISTS clause, and therefore
597  * we simply emit the original command for DEFAULT
598  * objects (modulo the adjustment made above).
599  *
600  * Likewise, don't mess with DATABASE PROPERTIES.
601  *
602  * If we used CREATE OR REPLACE VIEW as a means of
603  * quasi-dropping an ON SELECT rule, that should
604  * be emitted unchanged as well.
605  *
606  * For other object types, we need to extract the
607  * first part of the DROP which includes the
608  * object type. Most of the time this matches
609  * te->desc, so search for that; however for the
610  * different kinds of CONSTRAINTs, we know to
611  * search for hardcoded "DROP CONSTRAINT" instead.
612  */
613  if (strcmp(te->desc, "DEFAULT") == 0 ||
614  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
615  strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
616  appendPQExpBufferStr(ftStmt, dropStmt);
617  else
618  {
619  char buffer[40];
620  char *mark;
621 
622  if (strcmp(te->desc, "CONSTRAINT") == 0 ||
623  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
624  strcmp(te->desc, "FK CONSTRAINT") == 0)
625  strcpy(buffer, "DROP CONSTRAINT");
626  else
627  snprintf(buffer, sizeof(buffer), "DROP %s",
628  te->desc);
629 
630  mark = strstr(dropStmt, buffer);
631 
632  if (mark)
633  {
634  *mark = '\0';
635  appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
636  dropStmt, buffer,
637  mark + strlen(buffer));
638  }
639  else
640  {
641  /* complain and emit unmodified command */
642  pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
643  dropStmtOrig);
644  appendPQExpBufferStr(ftStmt, dropStmt);
645  }
646  }
647 
648  ahprintf(AH, "%s", ftStmt->data);
649 
650  destroyPQExpBuffer(ftStmt);
651  pg_free(dropStmtOrig);
652  }
653  }
654  }
655 
656  /*
657  * In --transaction-size mode, re-establish the transaction
658  * block if needed; otherwise, commit after every N drops.
659  */
660  if (ropt->txn_size > 0)
661  {
662  if (not_allowed_in_txn)
663  {
664  if (AH->connection)
665  StartTransaction(AHX);
666  else
667  ahprintf(AH, "BEGIN;\n");
668  AH->txnCount = 0;
669  }
670  else if (++AH->txnCount >= ropt->txn_size)
671  {
672  if (AH->connection)
673  {
674  CommitTransaction(AHX);
675  StartTransaction(AHX);
676  }
677  else
678  ahprintf(AH, "COMMIT;\nBEGIN;\n");
679  AH->txnCount = 0;
680  }
681  }
682  }
683  }
684 
685  /*
686  * _selectOutputSchema may have set currSchema to reflect the effect
687  * of a "SET search_path" command it emitted. However, by now we may
688  * have dropped that schema; or it might not have existed in the first
689  * place. In either case the effective value of search_path will not
690  * be what we think. Forcibly reset currSchema so that we will
691  * re-establish the search_path setting when needed (after creating
692  * the schema).
693  *
694  * If we treated users as pg_dump'able objects then we'd need to reset
695  * currUser here too.
696  */
697  free(AH->currSchema);
698  AH->currSchema = NULL;
699  }
700 
701  if (parallel_mode)
702  {
703  /*
704  * In parallel mode, turn control over to the parallel-restore logic.
705  */
706  ParallelState *pstate;
707  TocEntry pending_list;
708 
709  /* The archive format module may need some setup for this */
710  if (AH->PrepParallelRestorePtr)
711  AH->PrepParallelRestorePtr(AH);
712 
713  pending_list_header_init(&pending_list);
714 
715  /* This runs PRE_DATA items and then disconnects from the database */
716  restore_toc_entries_prefork(AH, &pending_list);
717  Assert(AH->connection == NULL);
718 
719  /* ParallelBackupStart() will actually fork the processes */
720  pstate = ParallelBackupStart(AH);
721  restore_toc_entries_parallel(AH, pstate, &pending_list);
722  ParallelBackupEnd(AH, pstate);
723 
724  /* reconnect the leader and see if we missed something */
725  restore_toc_entries_postfork(AH, &pending_list);
726  Assert(AH->connection != NULL);
727  }
728  else
729  {
730  /*
731  * In serial mode, process everything in three phases: normal items,
732  * then ACLs, then post-ACL items. We might be able to skip one or
733  * both extra phases in some cases, eg data-only restores.
734  */
735  bool haveACL = false;
736  bool havePostACL = false;
737 
738  for (te = AH->toc->next; te != AH->toc; te = te->next)
739  {
740  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
741  continue; /* ignore if not to be dumped at all */
742 
743  switch (_tocEntryRestorePass(te))
744  {
745  case RESTORE_PASS_MAIN:
746  (void) restore_toc_entry(AH, te, false);
747  break;
748  case RESTORE_PASS_ACL:
749  haveACL = true;
750  break;
752  havePostACL = true;
753  break;
754  }
755  }
756 
757  if (haveACL)
758  {
759  for (te = AH->toc->next; te != AH->toc; te = te->next)
760  {
761  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
763  (void) restore_toc_entry(AH, te, false);
764  }
765  }
766 
767  if (havePostACL)
768  {
769  for (te = AH->toc->next; te != AH->toc; te = te->next)
770  {
771  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
773  (void) restore_toc_entry(AH, te, false);
774  }
775  }
776  }
777 
778  /*
779  * Close out any persistent transaction we may have. While these two
780  * cases are started in different places, we can end both cases here.
781  */
782  if (ropt->single_txn || ropt->txn_size > 0)
783  {
784  if (AH->connection)
785  CommitTransaction(AHX);
786  else
787  ahprintf(AH, "COMMIT;\n\n");
788  }
789 
790  if (AH->public.verbose)
791  dumpTimestamp(AH, "Completed on", time(NULL));
792 
793  ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
794 
795  /*
796  * Clean up & we're done.
797  */
798  AH->stage = STAGE_FINALIZING;
799 
801  RestoreOutput(AH, sav);
802 
803  if (ropt->useDB)
805 }
806 
807 /*
808  * Restore a single TOC item. Used in both parallel and non-parallel restore;
809  * is_parallel is true if we are in a worker child process.
810  *
811  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
812  * the parallel parent has to make the corresponding status update.
813  */
814 static int
815 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
816 {
817  RestoreOptions *ropt = AH->public.ropt;
818  int status = WORKER_OK;
819  int reqs;
820  bool defnDumped;
821 
822  AH->currentTE = te;
823 
824  /* Dump any relevant dump warnings to stderr */
825  if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
826  {
827  if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
828  pg_log_warning("warning from original dump file: %s", te->defn);
829  else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
830  pg_log_warning("warning from original dump file: %s", te->copyStmt);
831  }
832 
833  /* Work out what, if anything, we want from this entry */
834  reqs = te->reqs;
835 
836  defnDumped = false;
837 
838  /*
839  * If it has a schema component that we want, then process that
840  */
841  if ((reqs & REQ_SCHEMA) != 0)
842  {
843  bool object_is_db = false;
844 
845  /*
846  * In --transaction-size mode, must exit our transaction block to
847  * create a database or set its properties.
848  */
849  if (strcmp(te->desc, "DATABASE") == 0 ||
850  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
851  {
852  object_is_db = true;
853  if (ropt->txn_size > 0)
854  {
855  if (AH->connection)
857  else
858  ahprintf(AH, "COMMIT;\n\n");
859  }
860  }
861 
862  /* Show namespace in log message if available */
863  if (te->namespace)
864  pg_log_info("creating %s \"%s.%s\"",
865  te->desc, te->namespace, te->tag);
866  else
867  pg_log_info("creating %s \"%s\"",
868  te->desc, te->tag);
869 
870  _printTocEntry(AH, te, false);
871  defnDumped = true;
872 
873  if (strcmp(te->desc, "TABLE") == 0)
874  {
875  if (AH->lastErrorTE == te)
876  {
877  /*
878  * We failed to create the table. If
879  * --no-data-for-failed-tables was given, mark the
880  * corresponding TABLE DATA to be ignored.
881  *
882  * In the parallel case this must be done in the parent, so we
883  * just set the return value.
884  */
885  if (ropt->noDataForFailedTables)
886  {
887  if (is_parallel)
888  status = WORKER_INHIBIT_DATA;
889  else
891  }
892  }
893  else
894  {
895  /*
896  * We created the table successfully. Mark the corresponding
897  * TABLE DATA for possible truncation.
898  *
899  * In the parallel case this must be done in the parent, so we
900  * just set the return value.
901  */
902  if (is_parallel)
903  status = WORKER_CREATE_DONE;
904  else
905  mark_create_done(AH, te);
906  }
907  }
908 
909  /*
910  * If we created a DB, connect to it. Also, if we changed DB
911  * properties, reconnect to ensure that relevant GUC settings are
912  * applied to our session. (That also restarts the transaction block
913  * in --transaction-size mode.)
914  */
915  if (object_is_db)
916  {
917  pg_log_info("connecting to new database \"%s\"", te->tag);
918  _reconnectToDB(AH, te->tag);
919  }
920  }
921 
922  /*
923  * If it has a data component that we want, then process that
924  */
925  if ((reqs & REQ_DATA) != 0)
926  {
927  /*
928  * hadDumper will be set if there is genuine data component for this
929  * node. Otherwise, we need to check the defn field for statements
930  * that need to be executed in data-only restores.
931  */
932  if (te->hadDumper)
933  {
934  /*
935  * If we can output the data, then restore it.
936  */
937  if (AH->PrintTocDataPtr != NULL)
938  {
939  _printTocEntry(AH, te, true);
940 
941  if (strcmp(te->desc, "BLOBS") == 0 ||
942  strcmp(te->desc, "BLOB COMMENTS") == 0)
943  {
944  pg_log_info("processing %s", te->desc);
945 
946  _selectOutputSchema(AH, "pg_catalog");
947 
948  /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
949  if (strcmp(te->desc, "BLOB COMMENTS") == 0)
951 
952  AH->PrintTocDataPtr(AH, te);
953 
955  }
956  else
957  {
958  bool use_truncate;
959 
961 
962  /* Select owner and schema as necessary */
963  _becomeOwner(AH, te);
964  _selectOutputSchema(AH, te->namespace);
965 
966  pg_log_info("processing data for table \"%s.%s\"",
967  te->namespace, te->tag);
968 
969  /*
970  * In parallel restore, if we created the table earlier in
971  * this run (so that we know it is empty) and we are not
972  * restoring a load-via-partition-root data item then we
973  * wrap the COPY in a transaction and precede it with a
974  * TRUNCATE. If wal_level is set to minimal this prevents
975  * WAL-logging the COPY. This obtains a speedup similar
976  * to that from using single_txn mode in non-parallel
977  * restores.
978  *
979  * We mustn't do this for load-via-partition-root cases
980  * because some data might get moved across partition
981  * boundaries, risking deadlock and/or loss of previously
982  * loaded data. (We assume that all partitions of a
983  * partitioned table will be treated the same way.)
984  */
985  use_truncate = is_parallel && te->created &&
987 
988  if (use_truncate)
989  {
990  /*
991  * Parallel restore is always talking directly to a
992  * server, so no need to see if we should issue BEGIN.
993  */
994  StartTransaction(&AH->public);
995 
996  /*
997  * Issue TRUNCATE with ONLY so that child tables are
998  * not wiped.
999  */
1000  ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1001  fmtQualifiedId(te->namespace, te->tag));
1002  }
1003 
1004  /*
1005  * If we have a copy statement, use it.
1006  */
1007  if (te->copyStmt && strlen(te->copyStmt) > 0)
1008  {
1009  ahprintf(AH, "%s", te->copyStmt);
1011  }
1012  else
1014 
1015  AH->PrintTocDataPtr(AH, te);
1016 
1017  /*
1018  * Terminate COPY if needed.
1019  */
1020  if (AH->outputKind == OUTPUT_COPYDATA &&
1021  RestoringToDB(AH))
1022  EndDBCopyMode(&AH->public, te->tag);
1023  AH->outputKind = OUTPUT_SQLCMDS;
1024 
1025  /* close out the transaction started above */
1026  if (use_truncate)
1027  CommitTransaction(&AH->public);
1028 
1030  }
1031  }
1032  }
1033  else if (!defnDumped)
1034  {
1035  /* If we haven't already dumped the defn part, do so now */
1036  pg_log_info("executing %s %s", te->desc, te->tag);
1037  _printTocEntry(AH, te, false);
1038  }
1039  }
1040 
1041  /*
1042  * If we emitted anything for this TOC entry, that counts as one action
1043  * against the transaction-size limit. Commit if it's time to.
1044  */
1045  if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
1046  {
1047  if (++AH->txnCount >= ropt->txn_size)
1048  {
1049  if (AH->connection)
1050  {
1051  CommitTransaction(&AH->public);
1052  StartTransaction(&AH->public);
1053  }
1054  else
1055  ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1056  AH->txnCount = 0;
1057  }
1058  }
1059 
1060  if (AH->public.n_errors > 0 && status == WORKER_OK)
1061  status = WORKER_IGNORED_ERRORS;
1062 
1063  return status;
1064 }
1065 
1066 /*
1067  * Allocate a new RestoreOptions block.
1068  * This is mainly so we can initialize it, but also for future expansion,
1069  */
1072 {
1074 
1076 
1077  /* set any fields that shouldn't default to zeroes */
1078  opts->format = archUnknown;
1079  opts->cparams.promptPassword = TRI_DEFAULT;
1080  opts->dumpSections = DUMP_UNSECTIONED;
1081  opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1082  opts->compression_spec.level = 0;
1083 
1084  return opts;
1085 }
1086 
1087 static void
1089 {
1090  RestoreOptions *ropt = AH->public.ropt;
1091 
1092  /* This hack is only needed in a data-only restore */
1093  if (!ropt->dataOnly || !ropt->disable_triggers)
1094  return;
1095 
1096  pg_log_info("disabling triggers for %s", te->tag);
1097 
1098  /*
1099  * Become superuser if possible, since they are the only ones who can
1100  * disable constraint triggers. If -S was not given, assume the initial
1101  * user identity is a superuser. (XXX would it be better to become the
1102  * table owner?)
1103  */
1104  _becomeUser(AH, ropt->superuser);
1105 
1106  /*
1107  * Disable them.
1108  */
1109  ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1110  fmtQualifiedId(te->namespace, te->tag));
1111 }
1112 
1113 static void
1115 {
1116  RestoreOptions *ropt = AH->public.ropt;
1117 
1118  /* This hack is only needed in a data-only restore */
1119  if (!ropt->dataOnly || !ropt->disable_triggers)
1120  return;
1121 
1122  pg_log_info("enabling triggers for %s", te->tag);
1123 
1124  /*
1125  * Become superuser if possible, since they are the only ones who can
1126  * disable constraint triggers. If -S was not given, assume the initial
1127  * user identity is a superuser. (XXX would it be better to become the
1128  * table owner?)
1129  */
1130  _becomeUser(AH, ropt->superuser);
1131 
1132  /*
1133  * Enable them.
1134  */
1135  ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1136  fmtQualifiedId(te->namespace, te->tag));
1137 }
1138 
1139 /*
1140  * Detect whether a TABLE DATA TOC item is performing "load via partition
1141  * root", that is the target table is an ancestor partition rather than the
1142  * table the TOC item is nominally for.
1143  *
1144  * In newer archive files this can be detected by checking for a special
1145  * comment placed in te->defn. In older files we have to fall back to seeing
1146  * if the COPY statement targets the named table or some other one. This
1147  * will not work for data dumped as INSERT commands, so we could give a false
1148  * negative in that case; fortunately, that's a rarely-used option.
1149  */
1150 static bool
1152 {
1153  if (te->defn &&
1154  strncmp(te->defn, "-- load via partition root ", 27) == 0)
1155  return true;
1156  if (te->copyStmt && *te->copyStmt)
1157  {
1158  PQExpBuffer copyStmt = createPQExpBuffer();
1159  bool result;
1160 
1161  /*
1162  * Build the initial part of the COPY as it would appear if the
1163  * nominal target table is the actual target. If we see anything
1164  * else, it must be a load-via-partition-root case.
1165  */
1166  appendPQExpBuffer(copyStmt, "COPY %s ",
1167  fmtQualifiedId(te->namespace, te->tag));
1168  result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1169  destroyPQExpBuffer(copyStmt);
1170  return result;
1171  }
1172  /* Assume it's not load-via-partition-root */
1173  return false;
1174 }
1175 
1176 /*
1177  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1178  */
1179 
1180 /* Public */
1181 void
1182 WriteData(Archive *AHX, const void *data, size_t dLen)
1183 {
1184  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1185 
1186  if (!AH->currToc)
1187  pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1188 
1189  AH->WriteDataPtr(AH, data, dLen);
1190 }
1191 
1192 /*
1193  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1194  * repository for all metadata. But the name has stuck.
1195  *
1196  * The new entry is added to the Archive's TOC list. Most callers can ignore
1197  * the result value because nothing else need be done, but a few want to
1198  * manipulate the TOC entry further.
1199  */
1200 
1201 /* Public */
1202 TocEntry *
1203 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1204  ArchiveOpts *opts)
1205 {
1206  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207  TocEntry *newToc;
1208 
1209  newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1210 
1211  AH->tocCount++;
1212  if (dumpId > AH->maxDumpId)
1213  AH->maxDumpId = dumpId;
1214 
1215  newToc->prev = AH->toc->prev;
1216  newToc->next = AH->toc;
1217  AH->toc->prev->next = newToc;
1218  AH->toc->prev = newToc;
1219 
1220  newToc->catalogId = catalogId;
1221  newToc->dumpId = dumpId;
1222  newToc->section = opts->section;
1223 
1224  newToc->tag = pg_strdup(opts->tag);
1225  newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1226  newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1227  newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1228  newToc->relkind = opts->relkind;
1229  newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1230  newToc->desc = pg_strdup(opts->description);
1231  newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1232  newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1233  newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1234 
1235  if (opts->nDeps > 0)
1236  {
1237  newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1238  memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1239  newToc->nDeps = opts->nDeps;
1240  }
1241  else
1242  {
1243  newToc->dependencies = NULL;
1244  newToc->nDeps = 0;
1245  }
1246 
1247  newToc->dataDumper = opts->dumpFn;
1248  newToc->dataDumperArg = opts->dumpArg;
1249  newToc->hadDumper = opts->dumpFn ? true : false;
1250 
1251  newToc->formatData = NULL;
1252  newToc->dataLength = 0;
1253 
1254  if (AH->ArchiveEntryPtr != NULL)
1255  AH->ArchiveEntryPtr(AH, newToc);
1256 
1257  return newToc;
1258 }
1259 
1260 /* Public */
1261 void
1263 {
1264  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1265  RestoreOptions *ropt = AH->public.ropt;
1266  TocEntry *te;
1267  pg_compress_specification out_compression_spec = {0};
1268  teSection curSection;
1269  CompressFileHandle *sav;
1270  const char *fmtName;
1271  char stamp_str[64];
1272 
1273  /* TOC is always uncompressed */
1274  out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1275 
1276  sav = SaveOutput(AH);
1277  if (ropt->filename)
1278  SetOutput(AH, ropt->filename, out_compression_spec);
1279 
1280  if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1281  localtime(&AH->createDate)) == 0)
1282  strcpy(stamp_str, "[unknown]");
1283 
1284  ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1285  ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1286  sanitize_line(AH->archdbname, false),
1287  AH->tocCount,
1289 
1290  switch (AH->format)
1291  {
1292  case archCustom:
1293  fmtName = "CUSTOM";
1294  break;
1295  case archDirectory:
1296  fmtName = "DIRECTORY";
1297  break;
1298  case archTar:
1299  fmtName = "TAR";
1300  break;
1301  default:
1302  fmtName = "UNKNOWN";
1303  }
1304 
1305  ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1307  ahprintf(AH, "; Format: %s\n", fmtName);
1308  ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1309  ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1310  if (AH->archiveRemoteVersion)
1311  ahprintf(AH, "; Dumped from database version: %s\n",
1312  AH->archiveRemoteVersion);
1313  if (AH->archiveDumpVersion)
1314  ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1315  AH->archiveDumpVersion);
1316 
1317  ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1318 
1319  curSection = SECTION_PRE_DATA;
1320  for (te = AH->toc->next; te != AH->toc; te = te->next)
1321  {
1322  /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1323  if (te->section != SECTION_NONE)
1324  curSection = te->section;
1325  te->reqs = _tocEntryRequired(te, curSection, AH);
1326  /* Now, should we print it? */
1327  if (ropt->verbose ||
1328  (te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
1329  {
1330  char *sanitized_name;
1331  char *sanitized_schema;
1332  char *sanitized_owner;
1333 
1334  /*
1335  */
1336  sanitized_name = sanitize_line(te->tag, false);
1337  sanitized_schema = sanitize_line(te->namespace, true);
1338  sanitized_owner = sanitize_line(te->owner, false);
1339 
1340  ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1341  te->catalogId.tableoid, te->catalogId.oid,
1342  te->desc, sanitized_schema, sanitized_name,
1343  sanitized_owner);
1344 
1345  free(sanitized_name);
1346  free(sanitized_schema);
1347  free(sanitized_owner);
1348  }
1349  if (ropt->verbose && te->nDeps > 0)
1350  {
1351  int i;
1352 
1353  ahprintf(AH, ";\tdepends on:");
1354  for (i = 0; i < te->nDeps; i++)
1355  ahprintf(AH, " %d", te->dependencies[i]);
1356  ahprintf(AH, "\n");
1357  }
1358  }
1359 
1360  /* Enforce strict names checking */
1361  if (ropt->strict_names)
1362  StrictNamesCheck(ropt);
1363 
1364  if (ropt->filename)
1365  RestoreOutput(AH, sav);
1366 }
1367 
1368 /***********
1369  * Large Object Archival
1370  ***********/
1371 
1372 /* Called by a dumper to signal start of a LO */
1373 int
1374 StartLO(Archive *AHX, Oid oid)
1375 {
1376  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1377 
1378  if (!AH->StartLOPtr)
1379  pg_fatal("large-object output not supported in chosen format");
1380 
1381  AH->StartLOPtr(AH, AH->currToc, oid);
1382 
1383  return 1;
1384 }
1385 
1386 /* Called by a dumper to signal end of a LO */
1387 int
1388 EndLO(Archive *AHX, Oid oid)
1389 {
1390  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1391 
1392  if (AH->EndLOPtr)
1393  AH->EndLOPtr(AH, AH->currToc, oid);
1394 
1395  return 1;
1396 }
1397 
1398 /**********
1399  * Large Object Restoration
1400  **********/
1401 
1402 /*
1403  * Called by a format handler before a group of LOs is restored
1404  */
1405 void
1407 {
1408  RestoreOptions *ropt = AH->public.ropt;
1409 
1410  /*
1411  * LOs must be restored within a transaction block, since we need the LO
1412  * handle to stay open while we write it. Establish a transaction unless
1413  * there's one being used globally.
1414  */
1415  if (!(ropt->single_txn || ropt->txn_size > 0))
1416  {
1417  if (AH->connection)
1418  StartTransaction(&AH->public);
1419  else
1420  ahprintf(AH, "BEGIN;\n\n");
1421  }
1422 
1423  AH->loCount = 0;
1424 }
1425 
1426 /*
1427  * Called by a format handler after a group of LOs is restored
1428  */
1429 void
1431 {
1432  RestoreOptions *ropt = AH->public.ropt;
1433 
1434  if (!(ropt->single_txn || ropt->txn_size > 0))
1435  {
1436  if (AH->connection)
1437  CommitTransaction(&AH->public);
1438  else
1439  ahprintf(AH, "COMMIT;\n\n");
1440  }
1441 
1442  pg_log_info(ngettext("restored %d large object",
1443  "restored %d large objects",
1444  AH->loCount),
1445  AH->loCount);
1446 }
1447 
1448 
1449 /*
1450  * Called by a format handler to initiate restoration of a LO
1451  */
1452 void
1453 StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1454 {
1455  bool old_lo_style = (AH->version < K_VERS_1_12);
1456  Oid loOid;
1457 
1458  AH->loCount++;
1459 
1460  /* Initialize the LO Buffer */
1461  if (AH->lo_buf == NULL)
1462  {
1463  /* First time through (in this process) so allocate the buffer */
1464  AH->lo_buf_size = LOBBUFSIZE;
1465  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
1466  }
1467  AH->lo_buf_used = 0;
1468 
1469  pg_log_info("restoring large object with OID %u", oid);
1470 
1471  /* With an old archive we must do drop and create logic here */
1472  if (old_lo_style && drop)
1473  DropLOIfExists(AH, oid);
1474 
1475  if (AH->connection)
1476  {
1477  if (old_lo_style)
1478  {
1479  loOid = lo_create(AH->connection, oid);
1480  if (loOid == 0 || loOid != oid)
1481  pg_fatal("could not create large object %u: %s",
1482  oid, PQerrorMessage(AH->connection));
1483  }
1484  AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1485  if (AH->loFd == -1)
1486  pg_fatal("could not open large object %u: %s",
1487  oid, PQerrorMessage(AH->connection));
1488  }
1489  else
1490  {
1491  if (old_lo_style)
1492  ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1493  oid, INV_WRITE);
1494  else
1495  ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1496  oid, INV_WRITE);
1497  }
1498 
1499  AH->writingLO = true;
1500 }
1501 
1502 void
1504 {
1505  if (AH->lo_buf_used > 0)
1506  {
1507  /* Write remaining bytes from the LO buffer */
1508  dump_lo_buf(AH);
1509  }
1510 
1511  AH->writingLO = false;
1512 
1513  if (AH->connection)
1514  {
1515  lo_close(AH->connection, AH->loFd);
1516  AH->loFd = -1;
1517  }
1518  else
1519  {
1520  ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1521  }
1522 }
1523 
1524 /***********
1525  * Sorting and Reordering
1526  ***********/
1527 
1528 void
1530 {
1531  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1532  RestoreOptions *ropt = AH->public.ropt;
1533  FILE *fh;
1534  StringInfoData linebuf;
1535 
1536  /* Allocate space for the 'wanted' array, and init it */
1537  ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1538 
1539  /* Setup the file */
1540  fh = fopen(ropt->tocFile, PG_BINARY_R);
1541  if (!fh)
1542  pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1543 
1544  initStringInfo(&linebuf);
1545 
1546  while (pg_get_line_buf(fh, &linebuf))
1547  {
1548  char *cmnt;
1549  char *endptr;
1550  DumpId id;
1551  TocEntry *te;
1552 
1553  /* Truncate line at comment, if any */
1554  cmnt = strchr(linebuf.data, ';');
1555  if (cmnt != NULL)
1556  {
1557  cmnt[0] = '\0';
1558  linebuf.len = cmnt - linebuf.data;
1559  }
1560 
1561  /* Ignore if all blank */
1562  if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1563  continue;
1564 
1565  /* Get an ID, check it's valid and not already seen */
1566  id = strtol(linebuf.data, &endptr, 10);
1567  if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1568  ropt->idWanted[id - 1])
1569  {
1570  pg_log_warning("line ignored: %s", linebuf.data);
1571  continue;
1572  }
1573 
1574  /* Find TOC entry */
1575  te = getTocEntryByDumpId(AH, id);
1576  if (!te)
1577  pg_fatal("could not find entry for ID %d",
1578  id);
1579 
1580  /* Mark it wanted */
1581  ropt->idWanted[id - 1] = true;
1582 
1583  /*
1584  * Move each item to the end of the list as it is selected, so that
1585  * they are placed in the desired order. Any unwanted items will end
1586  * up at the front of the list, which may seem unintuitive but it's
1587  * what we need. In an ordinary serial restore that makes no
1588  * difference, but in a parallel restore we need to mark unrestored
1589  * items' dependencies as satisfied before we start examining
1590  * restorable items. Otherwise they could have surprising
1591  * side-effects on the order in which restorable items actually get
1592  * restored.
1593  */
1594  _moveBefore(AH->toc, te);
1595  }
1596 
1597  pg_free(linebuf.data);
1598 
1599  if (fclose(fh) != 0)
1600  pg_fatal("could not close TOC file: %m");
1601 }
1602 
1603 /**********************
1604  * Convenience functions that look like standard IO functions
1605  * for writing data when in dump mode.
1606  **********************/
1607 
1608 /* Public */
1609 void
1610 archputs(const char *s, Archive *AH)
1611 {
1612  WriteData(AH, s, strlen(s));
1613 }
1614 
1615 /* Public */
1616 int
1617 archprintf(Archive *AH, const char *fmt,...)
1618 {
1619  int save_errno = errno;
1620  char *p;
1621  size_t len = 128; /* initial assumption about buffer size */
1622  size_t cnt;
1623 
1624  for (;;)
1625  {
1626  va_list args;
1627 
1628  /* Allocate work buffer. */
1629  p = (char *) pg_malloc(len);
1630 
1631  /* Try to format the data. */
1632  errno = save_errno;
1633  va_start(args, fmt);
1634  cnt = pvsnprintf(p, len, fmt, args);
1635  va_end(args);
1636 
1637  if (cnt < len)
1638  break; /* success */
1639 
1640  /* Release buffer and loop around to try again with larger len. */
1641  free(p);
1642  len = cnt;
1643  }
1644 
1645  WriteData(AH, p, cnt);
1646  free(p);
1647  return (int) cnt;
1648 }
1649 
1650 
1651 /*******************************
1652  * Stuff below here should be 'private' to the archiver routines
1653  *******************************/
1654 
1655 static void
1657  const pg_compress_specification compression_spec)
1658 {
1659  CompressFileHandle *CFH;
1660  const char *mode;
1661  int fn = -1;
1662 
1663  if (filename)
1664  {
1665  if (strcmp(filename, "-") == 0)
1666  fn = fileno(stdout);
1667  }
1668  else if (AH->FH)
1669  fn = fileno(AH->FH);
1670  else if (AH->fSpec)
1671  {
1672  filename = AH->fSpec;
1673  }
1674  else
1675  fn = fileno(stdout);
1676 
1677  if (AH->mode == archModeAppend)
1678  mode = PG_BINARY_A;
1679  else
1680  mode = PG_BINARY_W;
1681 
1682  CFH = InitCompressFileHandle(compression_spec);
1683 
1684  if (!CFH->open_func(filename, fn, mode, CFH))
1685  {
1686  if (filename)
1687  pg_fatal("could not open output file \"%s\": %m", filename);
1688  else
1689  pg_fatal("could not open output file: %m");
1690  }
1691 
1692  AH->OF = CFH;
1693 }
1694 
1695 static CompressFileHandle *
1697 {
1698  return (CompressFileHandle *) AH->OF;
1699 }
1700 
1701 static void
1703 {
1704  errno = 0;
1705  if (!EndCompressFileHandle(AH->OF))
1706  pg_fatal("could not close output file: %m");
1707 
1708  AH->OF = savedOutput;
1709 }
1710 
1711 
1712 
1713 /*
1714  * Print formatted text to the output file (usually stdout).
1715  */
1716 int
1717 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1718 {
1719  int save_errno = errno;
1720  char *p;
1721  size_t len = 128; /* initial assumption about buffer size */
1722  size_t cnt;
1723 
1724  for (;;)
1725  {
1726  va_list args;
1727 
1728  /* Allocate work buffer. */
1729  p = (char *) pg_malloc(len);
1730 
1731  /* Try to format the data. */
1732  errno = save_errno;
1733  va_start(args, fmt);
1734  cnt = pvsnprintf(p, len, fmt, args);
1735  va_end(args);
1736 
1737  if (cnt < len)
1738  break; /* success */
1739 
1740  /* Release buffer and loop around to try again with larger len. */
1741  free(p);
1742  len = cnt;
1743  }
1744 
1745  ahwrite(p, 1, cnt, AH);
1746  free(p);
1747  return (int) cnt;
1748 }
1749 
1750 /*
1751  * Single place for logic which says 'We are restoring to a direct DB connection'.
1752  */
1753 static int
1755 {
1756  RestoreOptions *ropt = AH->public.ropt;
1757 
1758  return (ropt && ropt->useDB && AH->connection);
1759 }
1760 
1761 /*
1762  * Dump the current contents of the LO data buffer while writing a LO
1763  */
1764 static void
1766 {
1767  if (AH->connection)
1768  {
1769  int res;
1770 
1771  res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1772  pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1773  "wrote %zu bytes of large object data (result = %d)",
1774  AH->lo_buf_used),
1775  AH->lo_buf_used, res);
1776  /* We assume there are no short writes, only errors */
1777  if (res != AH->lo_buf_used)
1778  warn_or_exit_horribly(AH, "could not write to large object: %s",
1779  PQerrorMessage(AH->connection));
1780  }
1781  else
1782  {
1784 
1786  (const unsigned char *) AH->lo_buf,
1787  AH->lo_buf_used,
1788  AH);
1789 
1790  /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1791  AH->writingLO = false;
1792  ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1793  AH->writingLO = true;
1794 
1796  }
1797  AH->lo_buf_used = 0;
1798 }
1799 
1800 
1801 /*
1802  * Write buffer to the output file (usually stdout). This is used for
1803  * outputting 'restore' scripts etc. It is even possible for an archive
1804  * format to create a custom output routine to 'fake' a restore if it
1805  * wants to generate a script (see TAR output).
1806  */
1807 void
1808 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1809 {
1810  int bytes_written = 0;
1811 
1812  if (AH->writingLO)
1813  {
1814  size_t remaining = size * nmemb;
1815 
1816  while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1817  {
1818  size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1819 
1820  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1821  ptr = (const void *) ((const char *) ptr + avail);
1822  remaining -= avail;
1823  AH->lo_buf_used += avail;
1824  dump_lo_buf(AH);
1825  }
1826 
1827  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1828  AH->lo_buf_used += remaining;
1829 
1830  bytes_written = size * nmemb;
1831  }
1832  else if (AH->CustomOutPtr)
1833  bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1834 
1835  /*
1836  * If we're doing a restore, and it's direct to DB, and we're connected
1837  * then send it to the DB.
1838  */
1839  else if (RestoringToDB(AH))
1840  bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1841  else
1842  {
1843  CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1844 
1845  if (CFH->write_func(ptr, size * nmemb, CFH))
1846  bytes_written = size * nmemb;
1847  }
1848 
1849  if (bytes_written != size * nmemb)
1851 }
1852 
1853 /* on some error, we may decide to go on... */
1854 void
1856 {
1857  va_list ap;
1858 
1859  switch (AH->stage)
1860  {
1861 
1862  case STAGE_NONE:
1863  /* Do nothing special */
1864  break;
1865 
1866  case STAGE_INITIALIZING:
1867  if (AH->stage != AH->lastErrorStage)
1868  pg_log_info("while INITIALIZING:");
1869  break;
1870 
1871  case STAGE_PROCESSING:
1872  if (AH->stage != AH->lastErrorStage)
1873  pg_log_info("while PROCESSING TOC:");
1874  break;
1875 
1876  case STAGE_FINALIZING:
1877  if (AH->stage != AH->lastErrorStage)
1878  pg_log_info("while FINALIZING:");
1879  break;
1880  }
1881  if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1882  {
1883  pg_log_info("from TOC entry %d; %u %u %s %s %s",
1884  AH->currentTE->dumpId,
1886  AH->currentTE->catalogId.oid,
1887  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1888  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1889  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1890  }
1891  AH->lastErrorStage = AH->stage;
1892  AH->lastErrorTE = AH->currentTE;
1893 
1894  va_start(ap, fmt);
1896  va_end(ap);
1897 
1898  if (AH->public.exit_on_error)
1899  exit_nicely(1);
1900  else
1901  AH->public.n_errors++;
1902 }
1903 
1904 #ifdef NOT_USED
1905 
1906 static void
1907 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1908 {
1909  /* Unlink te from list */
1910  te->prev->next = te->next;
1911  te->next->prev = te->prev;
1912 
1913  /* and insert it after "pos" */
1914  te->prev = pos;
1915  te->next = pos->next;
1916  pos->next->prev = te;
1917  pos->next = te;
1918 }
1919 #endif
1920 
1921 static void
1923 {
1924  /* Unlink te from list */
1925  te->prev->next = te->next;
1926  te->next->prev = te->prev;
1927 
1928  /* and insert it before "pos" */
1929  te->prev = pos->prev;
1930  te->next = pos;
1931  pos->prev->next = te;
1932  pos->prev = te;
1933 }
1934 
1935 /*
1936  * Build index arrays for the TOC list
1937  *
1938  * This should be invoked only after we have created or read in all the TOC
1939  * items.
1940  *
1941  * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1942  * array entries run only up to maxDumpId. We might see dependency dump IDs
1943  * beyond that (if the dump was partial); so always check the array bound
1944  * before trying to touch an array entry.
1945  */
1946 static void
1948 {
1949  DumpId maxDumpId = AH->maxDumpId;
1950  TocEntry *te;
1951 
1952  AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1953  AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1954 
1955  for (te = AH->toc->next; te != AH->toc; te = te->next)
1956  {
1957  /* this check is purely paranoia, maxDumpId should be correct */
1958  if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1959  pg_fatal("bad dumpId");
1960 
1961  /* tocsByDumpId indexes all TOCs by their dump ID */
1962  AH->tocsByDumpId[te->dumpId] = te;
1963 
1964  /*
1965  * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1966  * TOC entry that has a DATA item. We compute this by reversing the
1967  * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1968  * just one dependency and it is the TABLE item.
1969  */
1970  if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1971  {
1972  DumpId tableId = te->dependencies[0];
1973 
1974  /*
1975  * The TABLE item might not have been in the archive, if this was
1976  * a data-only dump; but its dump ID should be less than its data
1977  * item's dump ID, so there should be a place for it in the array.
1978  */
1979  if (tableId <= 0 || tableId > maxDumpId)
1980  pg_fatal("bad table dumpId for TABLE DATA item");
1981 
1982  AH->tableDataId[tableId] = te->dumpId;
1983  }
1984  }
1985 }
1986 
1987 TocEntry *
1989 {
1990  /* build index arrays if we didn't already */
1991  if (AH->tocsByDumpId == NULL)
1992  buildTocEntryArrays(AH);
1993 
1994  if (id > 0 && id <= AH->maxDumpId)
1995  return AH->tocsByDumpId[id];
1996 
1997  return NULL;
1998 }
1999 
2000 int
2002 {
2003  TocEntry *te = getTocEntryByDumpId(AH, id);
2004 
2005  if (!te)
2006  return 0;
2007 
2008  return te->reqs;
2009 }
2010 
2011 size_t
2012 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2013 {
2014  int off;
2015 
2016  /* Save the flag */
2017  AH->WriteBytePtr(AH, wasSet);
2018 
2019  /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2020  for (off = 0; off < sizeof(pgoff_t); off++)
2021  {
2022  AH->WriteBytePtr(AH, o & 0xFF);
2023  o >>= 8;
2024  }
2025  return sizeof(pgoff_t) + 1;
2026 }
2027 
2028 int
2030 {
2031  int i;
2032  int off;
2033  int offsetFlg;
2034 
2035  /* Initialize to zero */
2036  *o = 0;
2037 
2038  /* Check for old version */
2039  if (AH->version < K_VERS_1_7)
2040  {
2041  /* Prior versions wrote offsets using WriteInt */
2042  i = ReadInt(AH);
2043  /* -1 means not set */
2044  if (i < 0)
2045  return K_OFFSET_POS_NOT_SET;
2046  else if (i == 0)
2047  return K_OFFSET_NO_DATA;
2048 
2049  /* Cast to pgoff_t because it was written as an int. */
2050  *o = (pgoff_t) i;
2051  return K_OFFSET_POS_SET;
2052  }
2053 
2054  /*
2055  * Read the flag indicating the state of the data pointer. Check if valid
2056  * and die if not.
2057  *
2058  * This used to be handled by a negative or zero pointer, now we use an
2059  * extra byte specifically for the state.
2060  */
2061  offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2062 
2063  switch (offsetFlg)
2064  {
2065  case K_OFFSET_POS_NOT_SET:
2066  case K_OFFSET_NO_DATA:
2067  case K_OFFSET_POS_SET:
2068 
2069  break;
2070 
2071  default:
2072  pg_fatal("unexpected data offset flag %d", offsetFlg);
2073  }
2074 
2075  /*
2076  * Read the bytes
2077  */
2078  for (off = 0; off < AH->offSize; off++)
2079  {
2080  if (off < sizeof(pgoff_t))
2081  *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2082  else
2083  {
2084  if (AH->ReadBytePtr(AH) != 0)
2085  pg_fatal("file offset in dump file is too large");
2086  }
2087  }
2088 
2089  return offsetFlg;
2090 }
2091 
2092 size_t
2094 {
2095  int b;
2096 
2097  /*
2098  * This is a bit yucky, but I don't want to make the binary format very
2099  * dependent on representation, and not knowing much about it, I write out
2100  * a sign byte. If you change this, don't forget to change the file
2101  * version #, and modify ReadInt to read the new format AS WELL AS the old
2102  * formats.
2103  */
2104 
2105  /* SIGN byte */
2106  if (i < 0)
2107  {
2108  AH->WriteBytePtr(AH, 1);
2109  i = -i;
2110  }
2111  else
2112  AH->WriteBytePtr(AH, 0);
2113 
2114  for (b = 0; b < AH->intSize; b++)
2115  {
2116  AH->WriteBytePtr(AH, i & 0xFF);
2117  i >>= 8;
2118  }
2119 
2120  return AH->intSize + 1;
2121 }
2122 
2123 int
2125 {
2126  int res = 0;
2127  int bv,
2128  b;
2129  int sign = 0; /* Default positive */
2130  int bitShift = 0;
2131 
2132  if (AH->version > K_VERS_1_0)
2133  /* Read a sign byte */
2134  sign = AH->ReadBytePtr(AH);
2135 
2136  for (b = 0; b < AH->intSize; b++)
2137  {
2138  bv = AH->ReadBytePtr(AH) & 0xFF;
2139  if (bv != 0)
2140  res = res + (bv << bitShift);
2141  bitShift += 8;
2142  }
2143 
2144  if (sign)
2145  res = -res;
2146 
2147  return res;
2148 }
2149 
2150 size_t
2151 WriteStr(ArchiveHandle *AH, const char *c)
2152 {
2153  size_t res;
2154 
2155  if (c)
2156  {
2157  int len = strlen(c);
2158 
2159  res = WriteInt(AH, len);
2160  AH->WriteBufPtr(AH, c, len);
2161  res += len;
2162  }
2163  else
2164  res = WriteInt(AH, -1);
2165 
2166  return res;
2167 }
2168 
2169 char *
2171 {
2172  char *buf;
2173  int l;
2174 
2175  l = ReadInt(AH);
2176  if (l < 0)
2177  buf = NULL;
2178  else
2179  {
2180  buf = (char *) pg_malloc(l + 1);
2181  AH->ReadBufPtr(AH, (void *) buf, l);
2182 
2183  buf[l] = '\0';
2184  }
2185 
2186  return buf;
2187 }
2188 
2189 static bool
2190 _fileExistsInDirectory(const char *dir, const char *filename)
2191 {
2192  struct stat st;
2193  char buf[MAXPGPATH];
2194 
2195  if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2196  pg_fatal("directory name too long: \"%s\"", dir);
2197 
2198  return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2199 }
2200 
2201 static int
2203 {
2204  FILE *fh;
2205  char sig[6]; /* More than enough */
2206  size_t cnt;
2207  int wantClose = 0;
2208 
2209  pg_log_debug("attempting to ascertain archive format");
2210 
2211  free(AH->lookahead);
2212 
2213  AH->readHeader = 0;
2214  AH->lookaheadSize = 512;
2215  AH->lookahead = pg_malloc0(512);
2216  AH->lookaheadLen = 0;
2217  AH->lookaheadPos = 0;
2218 
2219  if (AH->fSpec)
2220  {
2221  struct stat st;
2222 
2223  wantClose = 1;
2224 
2225  /*
2226  * Check if the specified archive is a directory. If so, check if
2227  * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2228  */
2229  if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2230  {
2231  AH->format = archDirectory;
2232  if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2233  return AH->format;
2234 #ifdef HAVE_LIBZ
2235  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2236  return AH->format;
2237 #endif
2238 #ifdef USE_LZ4
2239  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2240  return AH->format;
2241 #endif
2242 #ifdef USE_ZSTD
2243  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2244  return AH->format;
2245 #endif
2246  pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2247  AH->fSpec);
2248  fh = NULL; /* keep compiler quiet */
2249  }
2250  else
2251  {
2252  fh = fopen(AH->fSpec, PG_BINARY_R);
2253  if (!fh)
2254  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2255  }
2256  }
2257  else
2258  {
2259  fh = stdin;
2260  if (!fh)
2261  pg_fatal("could not open input file: %m");
2262  }
2263 
2264  if ((cnt = fread(sig, 1, 5, fh)) != 5)
2265  {
2266  if (ferror(fh))
2267  pg_fatal("could not read input file: %m");
2268  else
2269  pg_fatal("input file is too short (read %lu, expected 5)",
2270  (unsigned long) cnt);
2271  }
2272 
2273  /* Save it, just in case we need it later */
2274  memcpy(&AH->lookahead[0], sig, 5);
2275  AH->lookaheadLen = 5;
2276 
2277  if (strncmp(sig, "PGDMP", 5) == 0)
2278  {
2279  /* It's custom format, stop here */
2280  AH->format = archCustom;
2281  AH->readHeader = 1;
2282  }
2283  else
2284  {
2285  /*
2286  * *Maybe* we have a tar archive format file or a text dump ... So,
2287  * read first 512 byte header...
2288  */
2289  cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2290  /* read failure is checked below */
2291  AH->lookaheadLen += cnt;
2292 
2293  if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2294  (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2295  strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2296  {
2297  /*
2298  * looks like it's probably a text format dump. so suggest they
2299  * try psql
2300  */
2301  pg_fatal("input file appears to be a text format dump. Please use psql.");
2302  }
2303 
2304  if (AH->lookaheadLen != 512)
2305  {
2306  if (feof(fh))
2307  pg_fatal("input file does not appear to be a valid archive (too short?)");
2308  else
2309  READ_ERROR_EXIT(fh);
2310  }
2311 
2312  if (!isValidTarHeader(AH->lookahead))
2313  pg_fatal("input file does not appear to be a valid archive");
2314 
2315  AH->format = archTar;
2316  }
2317 
2318  /* Close the file if we opened it */
2319  if (wantClose)
2320  {
2321  if (fclose(fh) != 0)
2322  pg_fatal("could not close input file: %m");
2323  /* Forget lookahead, since we'll re-read header after re-opening */
2324  AH->readHeader = 0;
2325  AH->lookaheadLen = 0;
2326  }
2327 
2328  return AH->format;
2329 }
2330 
2331 
2332 /*
2333  * Allocate an archive handle
2334  */
2335 static ArchiveHandle *
2336 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2337  const pg_compress_specification compression_spec,
2338  bool dosync, ArchiveMode mode,
2340 {
2341  ArchiveHandle *AH;
2342  CompressFileHandle *CFH;
2343  pg_compress_specification out_compress_spec = {0};
2344 
2345  pg_log_debug("allocating AH for %s, format %d",
2346  FileSpec ? FileSpec : "(stdio)", fmt);
2347 
2348  AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2349 
2350  AH->version = K_VERS_SELF;
2351 
2352  /* initialize for backwards compatible string processing */
2353  AH->public.encoding = 0; /* PG_SQL_ASCII */
2354  AH->public.std_strings = false;
2355 
2356  /* sql error handling */
2357  AH->public.exit_on_error = true;
2358  AH->public.n_errors = 0;
2359 
2360  AH->archiveDumpVersion = PG_VERSION;
2361 
2362  AH->createDate = time(NULL);
2363 
2364  AH->intSize = sizeof(int);
2365  AH->offSize = sizeof(pgoff_t);
2366  if (FileSpec)
2367  {
2368  AH->fSpec = pg_strdup(FileSpec);
2369 
2370  /*
2371  * Not used; maybe later....
2372  *
2373  * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2374  * i--) if (AH->workDir[i-1] == '/')
2375  */
2376  }
2377  else
2378  AH->fSpec = NULL;
2379 
2380  AH->currUser = NULL; /* unknown */
2381  AH->currSchema = NULL; /* ditto */
2382  AH->currTablespace = NULL; /* ditto */
2383  AH->currTableAm = NULL; /* ditto */
2384 
2385  AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2386 
2387  AH->toc->next = AH->toc;
2388  AH->toc->prev = AH->toc;
2389 
2390  AH->mode = mode;
2391  AH->compression_spec = compression_spec;
2392  AH->dosync = dosync;
2393  AH->sync_method = sync_method;
2394 
2395  memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2396 
2397  /* Open stdout with no compression for AH output handle */
2398  out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2399  CFH = InitCompressFileHandle(out_compress_spec);
2400  if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2401  pg_fatal("could not open stdout for appending: %m");
2402  AH->OF = CFH;
2403 
2404  /*
2405  * On Windows, we need to use binary mode to read/write non-text files,
2406  * which include all archive formats as well as compressed plain text.
2407  * Force stdin/stdout into binary mode if that is what we are using.
2408  */
2409 #ifdef WIN32
2410  if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2411  (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2412  {
2413  if (mode == archModeWrite)
2414  _setmode(fileno(stdout), O_BINARY);
2415  else
2416  _setmode(fileno(stdin), O_BINARY);
2417  }
2418 #endif
2419 
2420  AH->SetupWorkerPtr = setupWorkerPtr;
2421 
2422  if (fmt == archUnknown)
2423  AH->format = _discoverArchiveFormat(AH);
2424  else
2425  AH->format = fmt;
2426 
2427  switch (AH->format)
2428  {
2429  case archCustom:
2431  break;
2432 
2433  case archNull:
2434  InitArchiveFmt_Null(AH);
2435  break;
2436 
2437  case archDirectory:
2439  break;
2440 
2441  case archTar:
2442  InitArchiveFmt_Tar(AH);
2443  break;
2444 
2445  default:
2446  pg_fatal("unrecognized file format \"%d\"", fmt);
2447  }
2448 
2449  return AH;
2450 }
2451 
2452 /*
2453  * Write out all data (tables & LOs)
2454  */
2455 void
2457 {
2458  TocEntry *te;
2459 
2460  if (pstate && pstate->numWorkers > 1)
2461  {
2462  /*
2463  * In parallel mode, this code runs in the leader process. We
2464  * construct an array of candidate TEs, then sort it into decreasing
2465  * size order, then dispatch each TE to a data-transfer worker. By
2466  * dumping larger tables first, we avoid getting into a situation
2467  * where we're down to one job and it's big, losing parallelism.
2468  */
2469  TocEntry **tes;
2470  int ntes;
2471 
2472  tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2473  ntes = 0;
2474  for (te = AH->toc->next; te != AH->toc; te = te->next)
2475  {
2476  /* Consider only TEs with dataDumper functions ... */
2477  if (!te->dataDumper)
2478  continue;
2479  /* ... and ignore ones not enabled for dump */
2480  if ((te->reqs & REQ_DATA) == 0)
2481  continue;
2482 
2483  tes[ntes++] = te;
2484  }
2485 
2486  if (ntes > 1)
2487  qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2488 
2489  for (int i = 0; i < ntes; i++)
2490  DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2491  mark_dump_job_done, NULL);
2492 
2493  pg_free(tes);
2494 
2495  /* Now wait for workers to finish. */
2496  WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2497  }
2498  else
2499  {
2500  /* Non-parallel mode: just dump all candidate TEs sequentially. */
2501  for (te = AH->toc->next; te != AH->toc; te = te->next)
2502  {
2503  /* Must have same filter conditions as above */
2504  if (!te->dataDumper)
2505  continue;
2506  if ((te->reqs & REQ_DATA) == 0)
2507  continue;
2508 
2510  }
2511  }
2512 }
2513 
2514 
2515 /*
2516  * Callback function that's invoked in the leader process after a step has
2517  * been parallel dumped.
2518  *
2519  * We don't need to do anything except check for worker failure.
2520  */
2521 static void
2523  TocEntry *te,
2524  int status,
2525  void *callback_data)
2526 {
2527  pg_log_info("finished item %d %s %s",
2528  te->dumpId, te->desc, te->tag);
2529 
2530  if (status != 0)
2531  pg_fatal("worker process failed: exit code %d",
2532  status);
2533 }
2534 
2535 
2536 void
2538 {
2539  StartDataPtrType startPtr;
2540  EndDataPtrType endPtr;
2541 
2542  AH->currToc = te;
2543 
2544  if (strcmp(te->desc, "BLOBS") == 0)
2545  {
2546  startPtr = AH->StartLOsPtr;
2547  endPtr = AH->EndLOsPtr;
2548  }
2549  else
2550  {
2551  startPtr = AH->StartDataPtr;
2552  endPtr = AH->EndDataPtr;
2553  }
2554 
2555  if (startPtr != NULL)
2556  (*startPtr) (AH, te);
2557 
2558  /*
2559  * The user-provided DataDumper routine needs to call AH->WriteData
2560  */
2561  te->dataDumper((Archive *) AH, te->dataDumperArg);
2562 
2563  if (endPtr != NULL)
2564  (*endPtr) (AH, te);
2565 
2566  AH->currToc = NULL;
2567 }
2568 
2569 void
2571 {
2572  TocEntry *te;
2573  char workbuf[32];
2574  int tocCount;
2575  int i;
2576 
2577  /* count entries that will actually be dumped */
2578  tocCount = 0;
2579  for (te = AH->toc->next; te != AH->toc; te = te->next)
2580  {
2581  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2582  tocCount++;
2583  }
2584 
2585  /* printf("%d TOC Entries to save\n", tocCount); */
2586 
2587  WriteInt(AH, tocCount);
2588 
2589  for (te = AH->toc->next; te != AH->toc; te = te->next)
2590  {
2591  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2592  continue;
2593 
2594  WriteInt(AH, te->dumpId);
2595  WriteInt(AH, te->dataDumper ? 1 : 0);
2596 
2597  /* OID is recorded as a string for historical reasons */
2598  sprintf(workbuf, "%u", te->catalogId.tableoid);
2599  WriteStr(AH, workbuf);
2600  sprintf(workbuf, "%u", te->catalogId.oid);
2601  WriteStr(AH, workbuf);
2602 
2603  WriteStr(AH, te->tag);
2604  WriteStr(AH, te->desc);
2605  WriteInt(AH, te->section);
2606  WriteStr(AH, te->defn);
2607  WriteStr(AH, te->dropStmt);
2608  WriteStr(AH, te->copyStmt);
2609  WriteStr(AH, te->namespace);
2610  WriteStr(AH, te->tablespace);
2611  WriteStr(AH, te->tableam);
2612  WriteInt(AH, te->relkind);
2613  WriteStr(AH, te->owner);
2614  WriteStr(AH, "false");
2615 
2616  /* Dump list of dependencies */
2617  for (i = 0; i < te->nDeps; i++)
2618  {
2619  sprintf(workbuf, "%d", te->dependencies[i]);
2620  WriteStr(AH, workbuf);
2621  }
2622  WriteStr(AH, NULL); /* Terminate List */
2623 
2624  if (AH->WriteExtraTocPtr)
2625  AH->WriteExtraTocPtr(AH, te);
2626  }
2627 }
2628 
2629 void
2631 {
2632  int i;
2633  char *tmp;
2634  DumpId *deps;
2635  int depIdx;
2636  int depSize;
2637  TocEntry *te;
2638  bool is_supported;
2639 
2640  AH->tocCount = ReadInt(AH);
2641  AH->maxDumpId = 0;
2642 
2643  for (i = 0; i < AH->tocCount; i++)
2644  {
2645  te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2646  te->dumpId = ReadInt(AH);
2647 
2648  if (te->dumpId > AH->maxDumpId)
2649  AH->maxDumpId = te->dumpId;
2650 
2651  /* Sanity check */
2652  if (te->dumpId <= 0)
2653  pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2654  te->dumpId);
2655 
2656  te->hadDumper = ReadInt(AH);
2657 
2658  if (AH->version >= K_VERS_1_8)
2659  {
2660  tmp = ReadStr(AH);
2661  sscanf(tmp, "%u", &te->catalogId.tableoid);
2662  free(tmp);
2663  }
2664  else
2666  tmp = ReadStr(AH);
2667  sscanf(tmp, "%u", &te->catalogId.oid);
2668  free(tmp);
2669 
2670  te->tag = ReadStr(AH);
2671  te->desc = ReadStr(AH);
2672 
2673  if (AH->version >= K_VERS_1_11)
2674  {
2675  te->section = ReadInt(AH);
2676  }
2677  else
2678  {
2679  /*
2680  * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2681  * the entries into sections. This list need not cover entry
2682  * types added later than 8.4.
2683  */
2684  if (strcmp(te->desc, "COMMENT") == 0 ||
2685  strcmp(te->desc, "ACL") == 0 ||
2686  strcmp(te->desc, "ACL LANGUAGE") == 0)
2687  te->section = SECTION_NONE;
2688  else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2689  strcmp(te->desc, "BLOBS") == 0 ||
2690  strcmp(te->desc, "BLOB COMMENTS") == 0)
2691  te->section = SECTION_DATA;
2692  else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2693  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2694  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2695  strcmp(te->desc, "INDEX") == 0 ||
2696  strcmp(te->desc, "RULE") == 0 ||
2697  strcmp(te->desc, "TRIGGER") == 0)
2698  te->section = SECTION_POST_DATA;
2699  else
2700  te->section = SECTION_PRE_DATA;
2701  }
2702 
2703  te->defn = ReadStr(AH);
2704  te->dropStmt = ReadStr(AH);
2705 
2706  if (AH->version >= K_VERS_1_3)
2707  te->copyStmt = ReadStr(AH);
2708 
2709  if (AH->version >= K_VERS_1_6)
2710  te->namespace = ReadStr(AH);
2711 
2712  if (AH->version >= K_VERS_1_10)
2713  te->tablespace = ReadStr(AH);
2714 
2715  if (AH->version >= K_VERS_1_14)
2716  te->tableam = ReadStr(AH);
2717 
2718  if (AH->version >= K_VERS_1_16)
2719  te->relkind = ReadInt(AH);
2720 
2721  te->owner = ReadStr(AH);
2722  is_supported = true;
2723  if (AH->version < K_VERS_1_9)
2724  is_supported = false;
2725  else
2726  {
2727  tmp = ReadStr(AH);
2728 
2729  if (strcmp(tmp, "true") == 0)
2730  is_supported = false;
2731 
2732  free(tmp);
2733  }
2734 
2735  if (!is_supported)
2736  pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2737 
2738  /* Read TOC entry dependencies */
2739  if (AH->version >= K_VERS_1_5)
2740  {
2741  depSize = 100;
2742  deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2743  depIdx = 0;
2744  for (;;)
2745  {
2746  tmp = ReadStr(AH);
2747  if (!tmp)
2748  break; /* end of list */
2749  if (depIdx >= depSize)
2750  {
2751  depSize *= 2;
2752  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2753  }
2754  sscanf(tmp, "%d", &deps[depIdx]);
2755  free(tmp);
2756  depIdx++;
2757  }
2758 
2759  if (depIdx > 0) /* We have a non-null entry */
2760  {
2761  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2762  te->dependencies = deps;
2763  te->nDeps = depIdx;
2764  }
2765  else
2766  {
2767  free(deps);
2768  te->dependencies = NULL;
2769  te->nDeps = 0;
2770  }
2771  }
2772  else
2773  {
2774  te->dependencies = NULL;
2775  te->nDeps = 0;
2776  }
2777  te->dataLength = 0;
2778 
2779  if (AH->ReadExtraTocPtr)
2780  AH->ReadExtraTocPtr(AH, te);
2781 
2782  pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2783  i, te->dumpId, te->desc, te->tag);
2784 
2785  /* link completed entry into TOC circular list */
2786  te->prev = AH->toc->prev;
2787  AH->toc->prev->next = te;
2788  AH->toc->prev = te;
2789  te->next = AH->toc;
2790 
2791  /* special processing immediately upon read for some items */
2792  if (strcmp(te->desc, "ENCODING") == 0)
2793  processEncodingEntry(AH, te);
2794  else if (strcmp(te->desc, "STDSTRINGS") == 0)
2795  processStdStringsEntry(AH, te);
2796  else if (strcmp(te->desc, "SEARCHPATH") == 0)
2797  processSearchPathEntry(AH, te);
2798  }
2799 }
2800 
2801 static void
2803 {
2804  /* te->defn should have the form SET client_encoding = 'foo'; */
2805  char *defn = pg_strdup(te->defn);
2806  char *ptr1;
2807  char *ptr2 = NULL;
2808  int encoding;
2809 
2810  ptr1 = strchr(defn, '\'');
2811  if (ptr1)
2812  ptr2 = strchr(++ptr1, '\'');
2813  if (ptr2)
2814  {
2815  *ptr2 = '\0';
2816  encoding = pg_char_to_encoding(ptr1);
2817  if (encoding < 0)
2818  pg_fatal("unrecognized encoding \"%s\"",
2819  ptr1);
2820  AH->public.encoding = encoding;
2821  }
2822  else
2823  pg_fatal("invalid ENCODING item: %s",
2824  te->defn);
2825 
2826  free(defn);
2827 }
2828 
2829 static void
2831 {
2832  /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2833  char *ptr1;
2834 
2835  ptr1 = strchr(te->defn, '\'');
2836  if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2837  AH->public.std_strings = true;
2838  else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2839  AH->public.std_strings = false;
2840  else
2841  pg_fatal("invalid STDSTRINGS item: %s",
2842  te->defn);
2843 }
2844 
2845 static void
2847 {
2848  /*
2849  * te->defn should contain a command to set search_path. We just copy it
2850  * verbatim for use later.
2851  */
2852  AH->public.searchpath = pg_strdup(te->defn);
2853 }
2854 
2855 static void
2857 {
2858  const char *missing_name;
2859 
2860  Assert(ropt->strict_names);
2861 
2862  if (ropt->schemaNames.head != NULL)
2863  {
2864  missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2865  if (missing_name != NULL)
2866  pg_fatal("schema \"%s\" not found", missing_name);
2867  }
2868 
2869  if (ropt->tableNames.head != NULL)
2870  {
2871  missing_name = simple_string_list_not_touched(&ropt->tableNames);
2872  if (missing_name != NULL)
2873  pg_fatal("table \"%s\" not found", missing_name);
2874  }
2875 
2876  if (ropt->indexNames.head != NULL)
2877  {
2878  missing_name = simple_string_list_not_touched(&ropt->indexNames);
2879  if (missing_name != NULL)
2880  pg_fatal("index \"%s\" not found", missing_name);
2881  }
2882 
2883  if (ropt->functionNames.head != NULL)
2884  {
2885  missing_name = simple_string_list_not_touched(&ropt->functionNames);
2886  if (missing_name != NULL)
2887  pg_fatal("function \"%s\" not found", missing_name);
2888  }
2889 
2890  if (ropt->triggerNames.head != NULL)
2891  {
2892  missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2893  if (missing_name != NULL)
2894  pg_fatal("trigger \"%s\" not found", missing_name);
2895  }
2896 }
2897 
2898 /*
2899  * Determine whether we want to restore this TOC entry.
2900  *
2901  * Returns 0 if entry should be skipped, or some combination of the
2902  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2903  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2904  */
2905 static int
2907 {
2908  int res = REQ_SCHEMA | REQ_DATA;
2909  RestoreOptions *ropt = AH->public.ropt;
2910 
2911  /* These items are treated specially */
2912  if (strcmp(te->desc, "ENCODING") == 0 ||
2913  strcmp(te->desc, "STDSTRINGS") == 0 ||
2914  strcmp(te->desc, "SEARCHPATH") == 0)
2915  return REQ_SPECIAL;
2916 
2917  /*
2918  * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2919  * restored in createDB mode, and not restored otherwise, independently of
2920  * all else.
2921  */
2922  if (strcmp(te->desc, "DATABASE") == 0 ||
2923  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2924  {
2925  if (ropt->createDB)
2926  return REQ_SCHEMA;
2927  else
2928  return 0;
2929  }
2930 
2931  /*
2932  * Process exclusions that affect certain classes of TOC entries.
2933  */
2934 
2935  /* If it's an ACL, maybe ignore it */
2936  if (ropt->aclsSkip && _tocEntryIsACL(te))
2937  return 0;
2938 
2939  /* If it's a comment, maybe ignore it */
2940  if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2941  return 0;
2942 
2943  /*
2944  * If it's a publication or a table part of a publication, maybe ignore
2945  * it.
2946  */
2947  if (ropt->no_publications &&
2948  (strcmp(te->desc, "PUBLICATION") == 0 ||
2949  strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2950  strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2951  return 0;
2952 
2953  /* If it's a security label, maybe ignore it */
2954  if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2955  return 0;
2956 
2957  /* If it's a subscription, maybe ignore it */
2958  if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2959  return 0;
2960 
2961  /* Ignore it if section is not to be dumped/restored */
2962  switch (curSection)
2963  {
2964  case SECTION_PRE_DATA:
2965  if (!(ropt->dumpSections & DUMP_PRE_DATA))
2966  return 0;
2967  break;
2968  case SECTION_DATA:
2969  if (!(ropt->dumpSections & DUMP_DATA))
2970  return 0;
2971  break;
2972  case SECTION_POST_DATA:
2973  if (!(ropt->dumpSections & DUMP_POST_DATA))
2974  return 0;
2975  break;
2976  default:
2977  /* shouldn't get here, really, but ignore it */
2978  return 0;
2979  }
2980 
2981  /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2982  if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2983  return 0;
2984 
2985  /*
2986  * Check options for selective dump/restore.
2987  */
2988  if (strcmp(te->desc, "ACL") == 0 ||
2989  strcmp(te->desc, "COMMENT") == 0 ||
2990  strcmp(te->desc, "SECURITY LABEL") == 0)
2991  {
2992  /* Database properties react to createDB, not selectivity options. */
2993  if (strncmp(te->tag, "DATABASE ", 9) == 0)
2994  {
2995  if (!ropt->createDB)
2996  return 0;
2997  }
2998  else if (ropt->schemaNames.head != NULL ||
2999  ropt->schemaExcludeNames.head != NULL ||
3000  ropt->selTypes)
3001  {
3002  /*
3003  * In a selective dump/restore, we want to restore these dependent
3004  * TOC entry types only if their parent object is being restored.
3005  * Without selectivity options, we let through everything in the
3006  * archive. Note there may be such entries with no parent, eg
3007  * non-default ACLs for built-in objects. Also, we make
3008  * per-column ACLs additionally depend on the table's ACL if any
3009  * to ensure correct restore order, so those dependencies should
3010  * be ignored in this check.
3011  *
3012  * This code depends on the parent having been marked already,
3013  * which should be the case; if it isn't, perhaps due to
3014  * SortTocFromFile rearrangement, skipping the dependent entry
3015  * seems prudent anyway.
3016  *
3017  * Ideally we'd handle, eg, table CHECK constraints this way too.
3018  * But it's hard to tell which of their dependencies is the one to
3019  * consult.
3020  */
3021  bool dumpthis = false;
3022 
3023  for (int i = 0; i < te->nDeps; i++)
3024  {
3025  TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3026 
3027  if (!pte)
3028  continue; /* probably shouldn't happen */
3029  if (strcmp(pte->desc, "ACL") == 0)
3030  continue; /* ignore dependency on another ACL */
3031  if (pte->reqs == 0)
3032  continue; /* this object isn't marked, so ignore it */
3033  /* Found a parent to be dumped, so we want to dump this too */
3034  dumpthis = true;
3035  break;
3036  }
3037  if (!dumpthis)
3038  return 0;
3039  }
3040  }
3041  else
3042  {
3043  /* Apply selective-restore rules for standalone TOC entries. */
3044  if (ropt->schemaNames.head != NULL)
3045  {
3046  /* If no namespace is specified, it means all. */
3047  if (!te->namespace)
3048  return 0;
3049  if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3050  return 0;
3051  }
3052 
3053  if (ropt->schemaExcludeNames.head != NULL &&
3054  te->namespace &&
3055  simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3056  return 0;
3057 
3058  if (ropt->selTypes)
3059  {
3060  if (strcmp(te->desc, "TABLE") == 0 ||
3061  strcmp(te->desc, "TABLE DATA") == 0 ||
3062  strcmp(te->desc, "VIEW") == 0 ||
3063  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3064  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3065  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3066  strcmp(te->desc, "SEQUENCE") == 0 ||
3067  strcmp(te->desc, "SEQUENCE SET") == 0)
3068  {
3069  if (!ropt->selTable)
3070  return 0;
3071  if (ropt->tableNames.head != NULL &&
3073  return 0;
3074  }
3075  else if (strcmp(te->desc, "INDEX") == 0)
3076  {
3077  if (!ropt->selIndex)
3078  return 0;
3079  if (ropt->indexNames.head != NULL &&
3081  return 0;
3082  }
3083  else if (strcmp(te->desc, "FUNCTION") == 0 ||
3084  strcmp(te->desc, "AGGREGATE") == 0 ||
3085  strcmp(te->desc, "PROCEDURE") == 0)
3086  {
3087  if (!ropt->selFunction)
3088  return 0;
3089  if (ropt->functionNames.head != NULL &&
3091  return 0;
3092  }
3093  else if (strcmp(te->desc, "TRIGGER") == 0)
3094  {
3095  if (!ropt->selTrigger)
3096  return 0;
3097  if (ropt->triggerNames.head != NULL &&
3099  return 0;
3100  }
3101  else
3102  return 0;
3103  }
3104  }
3105 
3106  /*
3107  * Determine whether the TOC entry contains schema and/or data components,
3108  * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3109  * it's both schema and data. Otherwise it's probably schema-only, but
3110  * there are exceptions.
3111  */
3112  if (!te->hadDumper)
3113  {
3114  /*
3115  * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3116  * is considered a data entry. We don't need to check for BLOBS or
3117  * old-style BLOB COMMENTS entries, because they will have hadDumper =
3118  * true ... but we do need to check new-style BLOB ACLs, comments,
3119  * etc.
3120  */
3121  if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3122  strcmp(te->desc, "BLOB") == 0 ||
3123  strcmp(te->desc, "BLOB METADATA") == 0 ||
3124  (strcmp(te->desc, "ACL") == 0 &&
3125  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3126  (strcmp(te->desc, "COMMENT") == 0 &&
3127  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3128  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3129  strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3130  res = res & REQ_DATA;
3131  else
3132  res = res & ~REQ_DATA;
3133  }
3134 
3135  /*
3136  * If there's no definition command, there's no schema component. Treat
3137  * "load via partition root" comments as not schema.
3138  */
3139  if (!te->defn || !te->defn[0] ||
3140  strncmp(te->defn, "-- load via partition root ", 27) == 0)
3141  res = res & ~REQ_SCHEMA;
3142 
3143  /*
3144  * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3145  * always ignore it.
3146  */
3147  if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3148  return 0;
3149 
3150  /* Mask it if we only want schema */
3151  if (ropt->schemaOnly)
3152  {
3153  /*
3154  * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3155  *
3156  * In binary-upgrade mode, even with schemaOnly set, we do not mask
3157  * out large objects. (Only large object definitions, comments and
3158  * other metadata should be generated in binary-upgrade mode, not the
3159  * actual data, but that need not concern us here.)
3160  */
3161  if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3162  !(ropt->binary_upgrade &&
3163  (strcmp(te->desc, "BLOB") == 0 ||
3164  strcmp(te->desc, "BLOB METADATA") == 0 ||
3165  (strcmp(te->desc, "ACL") == 0 &&
3166  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3167  (strcmp(te->desc, "COMMENT") == 0 &&
3168  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3169  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3170  strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3171  res = res & REQ_SCHEMA;
3172  }
3173 
3174  /* Mask it if we only want data */
3175  if (ropt->dataOnly)
3176  res = res & REQ_DATA;
3177 
3178  return res;
3179 }
3180 
3181 /*
3182  * Identify which pass we should restore this TOC entry in.
3183  *
3184  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3185  */
3186 static RestorePass
3188 {
3189  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3190  if (strcmp(te->desc, "ACL") == 0 ||
3191  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3192  strcmp(te->desc, "DEFAULT ACL") == 0)
3193  return RESTORE_PASS_ACL;
3194  if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3195  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3196  return RESTORE_PASS_POST_ACL;
3197 
3198  /*
3199  * Comments need to be emitted in the same pass as their parent objects.
3200  * ACLs haven't got comments, and neither do matview data objects, but
3201  * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3202  * we'd need yet another weird special case.)
3203  */
3204  if (strcmp(te->desc, "COMMENT") == 0 &&
3205  strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3206  return RESTORE_PASS_POST_ACL;
3207 
3208  /* All else can be handled in the main pass. */
3209  return RESTORE_PASS_MAIN;
3210 }
3211 
3212 /*
3213  * Identify TOC entries that are ACLs.
3214  *
3215  * Note: it seems worth duplicating some code here to avoid a hard-wired
3216  * assumption that these are exactly the same entries that we restore during
3217  * the RESTORE_PASS_ACL phase.
3218  */
3219 static bool
3221 {
3222  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3223  if (strcmp(te->desc, "ACL") == 0 ||
3224  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3225  strcmp(te->desc, "DEFAULT ACL") == 0)
3226  return true;
3227  return false;
3228 }
3229 
3230 /*
3231  * Issue SET commands for parameters that we want to have set the same way
3232  * at all times during execution of a restore script.
3233  */
3234 static void
3236 {
3237  RestoreOptions *ropt = AH->public.ropt;
3238 
3239  /*
3240  * Disable timeouts to allow for slow commands, idle parallel workers, etc
3241  */
3242  ahprintf(AH, "SET statement_timeout = 0;\n");
3243  ahprintf(AH, "SET lock_timeout = 0;\n");
3244  ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3245  ahprintf(AH, "SET transaction_timeout = 0;\n");
3246 
3247  /* Select the correct character set encoding */
3248  ahprintf(AH, "SET client_encoding = '%s';\n",
3250 
3251  /* Select the correct string literal syntax */
3252  ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3253  AH->public.std_strings ? "on" : "off");
3254 
3255  /* Select the role to be used during restore */
3256  if (ropt && ropt->use_role)
3257  ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3258 
3259  /* Select the dump-time search_path */
3260  if (AH->public.searchpath)
3261  ahprintf(AH, "%s", AH->public.searchpath);
3262 
3263  /* Make sure function checking is disabled */
3264  ahprintf(AH, "SET check_function_bodies = false;\n");
3265 
3266  /* Ensure that all valid XML data will be accepted */
3267  ahprintf(AH, "SET xmloption = content;\n");
3268 
3269  /* Avoid annoying notices etc */
3270  ahprintf(AH, "SET client_min_messages = warning;\n");
3271  if (!AH->public.std_strings)
3272  ahprintf(AH, "SET escape_string_warning = off;\n");
3273 
3274  /* Adjust row-security state */
3275  if (ropt && ropt->enable_row_security)
3276  ahprintf(AH, "SET row_security = on;\n");
3277  else
3278  ahprintf(AH, "SET row_security = off;\n");
3279 
3280  /*
3281  * In --transaction-size mode, we should always be in a transaction when
3282  * we begin to restore objects.
3283  */
3284  if (ropt && ropt->txn_size > 0)
3285  {
3286  if (AH->connection)
3287  StartTransaction(&AH->public);
3288  else
3289  ahprintf(AH, "\nBEGIN;\n");
3290  AH->txnCount = 0;
3291  }
3292 
3293  ahprintf(AH, "\n");
3294 }
3295 
3296 /*
3297  * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3298  * for updating state if appropriate. If user is NULL or an empty string,
3299  * the specification DEFAULT will be used.
3300  */
3301 static void
3303 {
3305 
3306  appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3307 
3308  /*
3309  * SQL requires a string literal here. Might as well be correct.
3310  */
3311  if (user && *user)
3312  appendStringLiteralAHX(cmd, user, AH);
3313  else
3314  appendPQExpBufferStr(cmd, "DEFAULT");
3315  appendPQExpBufferChar(cmd, ';');
3316 
3317  if (RestoringToDB(AH))
3318  {
3319  PGresult *res;
3320 
3321  res = PQexec(AH->connection, cmd->data);
3322 
3323  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3324  /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3325  pg_fatal("could not set session user to \"%s\": %s",
3327 
3328  PQclear(res);
3329  }
3330  else
3331  ahprintf(AH, "%s\n\n", cmd->data);
3332 
3333  destroyPQExpBuffer(cmd);
3334 }
3335 
3336 
3337 /*
3338  * Issue the commands to connect to the specified database.
3339  *
3340  * If we're currently restoring right into a database, this will
3341  * actually establish a connection. Otherwise it puts a \connect into
3342  * the script output.
3343  */
3344 static void
3346 {
3347  if (RestoringToDB(AH))
3349  else
3350  {
3351  PQExpBufferData connectbuf;
3352 
3353  initPQExpBuffer(&connectbuf);
3354  appendPsqlMetaConnect(&connectbuf, dbname);
3355  ahprintf(AH, "%s\n", connectbuf.data);
3356  termPQExpBuffer(&connectbuf);
3357  }
3358 
3359  /*
3360  * NOTE: currUser keeps track of what the imaginary session user in our
3361  * script is. It's now effectively reset to the original userID.
3362  */
3363  free(AH->currUser);
3364  AH->currUser = NULL;
3365 
3366  /* don't assume we still know the output schema, tablespace, etc either */
3367  free(AH->currSchema);
3368  AH->currSchema = NULL;
3369 
3370  free(AH->currTableAm);
3371  AH->currTableAm = NULL;
3372 
3373  free(AH->currTablespace);
3374  AH->currTablespace = NULL;
3375 
3376  /* re-establish fixed state */
3378 }
3379 
3380 /*
3381  * Become the specified user, and update state to avoid redundant commands
3382  *
3383  * NULL or empty argument is taken to mean restoring the session default
3384  */
3385 static void
3386 _becomeUser(ArchiveHandle *AH, const char *user)
3387 {
3388  if (!user)
3389  user = ""; /* avoid null pointers */
3390 
3391  if (AH->currUser && strcmp(AH->currUser, user) == 0)
3392  return; /* no need to do anything */
3393 
3394  _doSetSessionAuth(AH, user);
3395 
3396  /*
3397  * NOTE: currUser keeps track of what the imaginary session user in our
3398  * script is
3399  */
3400  free(AH->currUser);
3401  AH->currUser = pg_strdup(user);
3402 }
3403 
3404 /*
3405  * Become the owner of the given TOC entry object. If
3406  * changes in ownership are not allowed, this doesn't do anything.
3407  */
3408 static void
3410 {
3411  RestoreOptions *ropt = AH->public.ropt;
3412 
3413  if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3414  return;
3415 
3416  _becomeUser(AH, te->owner);
3417 }
3418 
3419 
3420 /*
3421  * Issue the commands to select the specified schema as the current schema
3422  * in the target database.
3423  */
3424 static void
3425 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3426 {
3427  PQExpBuffer qry;
3428 
3429  /*
3430  * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3431  * that search_path rather than switching to entry-specific paths.
3432  * Otherwise, it's an old archive that will not restore correctly unless
3433  * we set the search_path as it's expecting.
3434  */
3435  if (AH->public.searchpath)
3436  return;
3437 
3438  if (!schemaName || *schemaName == '\0' ||
3439  (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3440  return; /* no need to do anything */
3441 
3442  qry = createPQExpBuffer();
3443 
3444  appendPQExpBuffer(qry, "SET search_path = %s",
3445  fmtId(schemaName));
3446  if (strcmp(schemaName, "pg_catalog") != 0)
3447  appendPQExpBufferStr(qry, ", pg_catalog");
3448 
3449  if (RestoringToDB(AH))
3450  {
3451  PGresult *res;
3452 
3453  res = PQexec(AH->connection, qry->data);
3454 
3455  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3457  "could not set \"search_path\" to \"%s\": %s",
3458  schemaName, PQerrorMessage(AH->connection));
3459 
3460  PQclear(res);
3461  }
3462  else
3463  ahprintf(AH, "%s;\n\n", qry->data);
3464 
3465  free(AH->currSchema);
3466  AH->currSchema = pg_strdup(schemaName);
3467 
3468  destroyPQExpBuffer(qry);
3469 }
3470 
3471 /*
3472  * Issue the commands to select the specified tablespace as the current one
3473  * in the target database.
3474  */
3475 static void
3477 {
3478  RestoreOptions *ropt = AH->public.ropt;
3479  PQExpBuffer qry;
3480  const char *want,
3481  *have;
3482 
3483  /* do nothing in --no-tablespaces mode */
3484  if (ropt->noTablespace)
3485  return;
3486 
3487  have = AH->currTablespace;
3488  want = tablespace;
3489 
3490  /* no need to do anything for non-tablespace object */
3491  if (!want)
3492  return;
3493 
3494  if (have && strcmp(want, have) == 0)
3495  return; /* no need to do anything */
3496 
3497  qry = createPQExpBuffer();
3498 
3499  if (strcmp(want, "") == 0)
3500  {
3501  /* We want the tablespace to be the database's default */
3502  appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3503  }
3504  else
3505  {
3506  /* We want an explicit tablespace */
3507  appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3508  }
3509 
3510  if (RestoringToDB(AH))
3511  {
3512  PGresult *res;
3513 
3514  res = PQexec(AH->connection, qry->data);
3515 
3516  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3518  "could not set \"default_tablespace\" to %s: %s",
3519  fmtId(want), PQerrorMessage(AH->connection));
3520 
3521  PQclear(res);
3522  }
3523  else
3524  ahprintf(AH, "%s;\n\n", qry->data);
3525 
3526  free(AH->currTablespace);
3527  AH->currTablespace = pg_strdup(want);
3528 
3529  destroyPQExpBuffer(qry);
3530 }
3531 
3532 /*
3533  * Set the proper default_table_access_method value for the table.
3534  */
3535 static void
3536 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3537 {
3538  RestoreOptions *ropt = AH->public.ropt;
3539  PQExpBuffer cmd;
3540  const char *want,
3541  *have;
3542 
3543  /* do nothing in --no-table-access-method mode */
3544  if (ropt->noTableAm)
3545  return;
3546 
3547  have = AH->currTableAm;
3548  want = tableam;
3549 
3550  if (!want)
3551  return;
3552 
3553  if (have && strcmp(want, have) == 0)
3554  return;
3555 
3556  cmd = createPQExpBuffer();
3557  appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3558 
3559  if (RestoringToDB(AH))
3560  {
3561  PGresult *res;
3562 
3563  res = PQexec(AH->connection, cmd->data);
3564 
3565  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3567  "could not set \"default_table_access_method\": %s",
3568  PQerrorMessage(AH->connection));
3569 
3570  PQclear(res);
3571  }
3572  else
3573  ahprintf(AH, "%s\n\n", cmd->data);
3574 
3575  destroyPQExpBuffer(cmd);
3576 
3577  free(AH->currTableAm);
3578  AH->currTableAm = pg_strdup(want);
3579 }
3580 
3581 /*
3582  * Set the proper default table access method for a table without storage.
3583  * Currently, this is required only for partitioned tables with a table AM.
3584  */
3585 static void
3587 {
3588  RestoreOptions *ropt = AH->public.ropt;
3589  const char *tableam = te->tableam;
3590  PQExpBuffer cmd;
3591 
3592  /* do nothing in --no-table-access-method mode */
3593  if (ropt->noTableAm)
3594  return;
3595 
3596  if (!tableam)
3597  return;
3598 
3599  Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3600 
3601  cmd = createPQExpBuffer();
3602 
3603  appendPQExpBufferStr(cmd, "ALTER TABLE ");
3604  appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3605  appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3606  fmtId(tableam));
3607 
3608  if (RestoringToDB(AH))
3609  {
3610  PGresult *res;
3611 
3612  res = PQexec(AH->connection, cmd->data);
3613 
3614  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3616  "could not alter table access method: %s",
3617  PQerrorMessage(AH->connection));
3618  PQclear(res);
3619  }
3620  else
3621  ahprintf(AH, "%s\n\n", cmd->data);
3622 
3623  destroyPQExpBuffer(cmd);
3624 }
3625 
3626 /*
3627  * Extract an object description for a TOC entry, and append it to buf.
3628  *
3629  * This is used for ALTER ... OWNER TO.
3630  *
3631  * If the object type has no owner, do nothing.
3632  */
3633 static void
3635 {
3636  const char *type = te->desc;
3637 
3638  /* objects that don't require special decoration */
3639  if (strcmp(type, "COLLATION") == 0 ||
3640  strcmp(type, "CONVERSION") == 0 ||
3641  strcmp(type, "DOMAIN") == 0 ||
3642  strcmp(type, "FOREIGN TABLE") == 0 ||
3643  strcmp(type, "MATERIALIZED VIEW") == 0 ||
3644  strcmp(type, "SEQUENCE") == 0 ||
3645  strcmp(type, "STATISTICS") == 0 ||
3646  strcmp(type, "TABLE") == 0 ||
3647  strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3648  strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3649  strcmp(type, "TYPE") == 0 ||
3650  strcmp(type, "VIEW") == 0 ||
3651  /* non-schema-specified objects */
3652  strcmp(type, "DATABASE") == 0 ||
3653  strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3654  strcmp(type, "SCHEMA") == 0 ||
3655  strcmp(type, "EVENT TRIGGER") == 0 ||
3656  strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3657  strcmp(type, "SERVER") == 0 ||
3658  strcmp(type, "PUBLICATION") == 0 ||
3659  strcmp(type, "SUBSCRIPTION") == 0)
3660  {
3661  appendPQExpBuffer(buf, "%s ", type);
3662  if (te->namespace && *te->namespace)
3663  appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3665  }
3666  /* LOs just have a name, but it's numeric so must not use fmtId */
3667  else if (strcmp(type, "BLOB") == 0)
3668  {
3669  appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3670  }
3671 
3672  /*
3673  * These object types require additional decoration. Fortunately, the
3674  * information needed is exactly what's in the DROP command.
3675  */
3676  else if (strcmp(type, "AGGREGATE") == 0 ||
3677  strcmp(type, "FUNCTION") == 0 ||
3678  strcmp(type, "OPERATOR") == 0 ||
3679  strcmp(type, "OPERATOR CLASS") == 0 ||
3680  strcmp(type, "OPERATOR FAMILY") == 0 ||
3681  strcmp(type, "PROCEDURE") == 0)
3682  {
3683  /* Chop "DROP " off the front and make a modifiable copy */
3684  char *first = pg_strdup(te->dropStmt + 5);
3685  char *last;
3686 
3687  /* point to last character in string */
3688  last = first + strlen(first) - 1;
3689 
3690  /* Strip off any ';' or '\n' at the end */
3691  while (last >= first && (*last == '\n' || *last == ';'))
3692  last--;
3693  *(last + 1) = '\0';
3694 
3695  appendPQExpBufferStr(buf, first);
3696 
3697  free(first);
3698  return;
3699  }
3700  /* these object types don't have separate owners */
3701  else if (strcmp(type, "CAST") == 0 ||
3702  strcmp(type, "CHECK CONSTRAINT") == 0 ||
3703  strcmp(type, "CONSTRAINT") == 0 ||
3704  strcmp(type, "DATABASE PROPERTIES") == 0 ||
3705  strcmp(type, "DEFAULT") == 0 ||
3706  strcmp(type, "FK CONSTRAINT") == 0 ||
3707  strcmp(type, "INDEX") == 0 ||
3708  strcmp(type, "RULE") == 0 ||
3709  strcmp(type, "TRIGGER") == 0 ||
3710  strcmp(type, "ROW SECURITY") == 0 ||
3711  strcmp(type, "POLICY") == 0 ||
3712  strcmp(type, "USER MAPPING") == 0)
3713  {
3714  /* do nothing */
3715  }
3716  else
3717  pg_fatal("don't know how to set owner for object type \"%s\"", type);
3718 }
3719 
3720 /*
3721  * Emit the SQL commands to create the object represented by a TOC entry
3722  *
3723  * This now also includes issuing an ALTER OWNER command to restore the
3724  * object's ownership, if wanted. But note that the object's permissions
3725  * will remain at default, until the matching ACL TOC entry is restored.
3726  */
3727 static void
3728 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3729 {
3730  RestoreOptions *ropt = AH->public.ropt;
3731 
3732  /*
3733  * Select owner, schema, tablespace and default AM as necessary. The
3734  * default access method for partitioned tables is handled after
3735  * generating the object definition, as it requires an ALTER command
3736  * rather than SET.
3737  */
3738  _becomeOwner(AH, te);
3739  _selectOutputSchema(AH, te->namespace);
3740  _selectTablespace(AH, te->tablespace);
3741  if (te->relkind != RELKIND_PARTITIONED_TABLE)
3743 
3744  /* Emit header comment for item */
3745  if (!AH->noTocComments)
3746  {
3747  const char *pfx;
3748  char *sanitized_name;
3749  char *sanitized_schema;
3750  char *sanitized_owner;
3751 
3752  if (isData)
3753  pfx = "Data for ";
3754  else
3755  pfx = "";
3756 
3757  ahprintf(AH, "--\n");
3758  if (AH->public.verbose)
3759  {
3760  ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3761  te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3762  if (te->nDeps > 0)
3763  {
3764  int i;
3765 
3766  ahprintf(AH, "-- Dependencies:");
3767  for (i = 0; i < te->nDeps; i++)
3768  ahprintf(AH, " %d", te->dependencies[i]);
3769  ahprintf(AH, "\n");
3770  }
3771  }
3772 
3773  sanitized_name = sanitize_line(te->tag, false);
3774  sanitized_schema = sanitize_line(te->namespace, true);
3775  sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3776 
3777  ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3778  pfx, sanitized_name, te->desc, sanitized_schema,
3779  sanitized_owner);
3780 
3781  free(sanitized_name);
3782  free(sanitized_schema);
3783  free(sanitized_owner);
3784 
3785  if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3786  {
3787  char *sanitized_tablespace;
3788 
3789  sanitized_tablespace = sanitize_line(te->tablespace, false);
3790  ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3791  free(sanitized_tablespace);
3792  }
3793  ahprintf(AH, "\n");
3794 
3795  if (AH->PrintExtraTocPtr != NULL)
3796  AH->PrintExtraTocPtr(AH, te);
3797  ahprintf(AH, "--\n\n");
3798  }
3799 
3800  /*
3801  * Actually print the definition. Normally we can just print the defn
3802  * string if any, but we have three special cases:
3803  *
3804  * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3805  * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3806  * "public" that is a comment. We have to do this when --no-owner mode is
3807  * selected. This is ugly, but I see no other good way ...
3808  *
3809  * 2. BLOB METADATA entries need special processing since their defn
3810  * strings are just lists of OIDs, not complete SQL commands.
3811  *
3812  * 3. ACL LARGE OBJECTS entries need special processing because they
3813  * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3814  * apply to each large object listed in the associated BLOB METADATA.
3815  */
3816  if (ropt->noOwner &&
3817  strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3818  {
3819  ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3820  }
3821  else if (strcmp(te->desc, "BLOB METADATA") == 0)
3822  {
3823  IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3824  }
3825  else if (strcmp(te->desc, "ACL") == 0 &&
3826  strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3827  {
3828  IssueACLPerBlob(AH, te);
3829  }
3830  else if (te->defn && strlen(te->defn) > 0)
3831  {
3832  ahprintf(AH, "%s\n\n", te->defn);
3833 
3834  /*
3835  * If the defn string contains multiple SQL commands, txn_size mode
3836  * should count it as N actions not one. But rather than build a full
3837  * SQL parser, approximate this by counting semicolons. One case
3838  * where that tends to be badly fooled is function definitions, so
3839  * ignore them. (restore_toc_entry will count one action anyway.)
3840  */
3841  if (ropt->txn_size > 0 &&
3842  strcmp(te->desc, "FUNCTION") != 0 &&
3843  strcmp(te->desc, "PROCEDURE") != 0)
3844  {
3845  const char *p = te->defn;
3846  int nsemis = 0;
3847 
3848  while ((p = strchr(p, ';')) != NULL)
3849  {
3850  nsemis++;
3851  p++;
3852  }
3853  if (nsemis > 1)
3854  AH->txnCount += nsemis - 1;
3855  }
3856  }
3857 
3858  /*
3859  * If we aren't using SET SESSION AUTH to determine ownership, we must
3860  * instead issue an ALTER OWNER command. Schema "public" is special; when
3861  * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3862  * when using SET SESSION for all other objects. We assume that anything
3863  * without a DROP command is not a separately ownable object.
3864  */
3865  if (!ropt->noOwner &&
3866  (!ropt->use_setsessauth ||
3867  (strcmp(te->desc, "SCHEMA") == 0 &&
3868  strncmp(te->defn, "--", 2) == 0)) &&
3869  te->owner && strlen(te->owner) > 0 &&
3870  te->dropStmt && strlen(te->dropStmt) > 0)
3871  {
3872  if (strcmp(te->desc, "BLOB METADATA") == 0)
3873  {
3874  /* BLOB METADATA needs special code to handle multiple LOs */
3875  char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3876 
3877  IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3878  pg_free(cmdEnd);
3879  }
3880  else
3881  {
3882  /* For all other cases, we can use _getObjectDescription */
3883  PQExpBufferData temp;
3884 
3885  initPQExpBuffer(&temp);
3886  _getObjectDescription(&temp, te);
3887 
3888  /*
3889  * If _getObjectDescription() didn't fill the buffer, then there
3890  * is no owner.
3891  */
3892  if (temp.data[0])
3893  ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3894  temp.data, fmtId(te->owner));
3895  termPQExpBuffer(&temp);
3896  }
3897  }
3898 
3899  /*
3900  * Select a partitioned table's default AM, once the table definition has
3901  * been generated.
3902  */
3903  if (te->relkind == RELKIND_PARTITIONED_TABLE)
3905 
3906  /*
3907  * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3908  * commands, so we can no longer assume we know the current auth setting.
3909  */
3910  if (_tocEntryIsACL(te))
3911  {
3912  free(AH->currUser);
3913  AH->currUser = NULL;
3914  }
3915 }
3916 
3917 /*
3918  * Sanitize a string to be included in an SQL comment or TOC listing, by
3919  * replacing any newlines with spaces. This ensures each logical output line
3920  * is in fact one physical output line, to prevent corruption of the dump
3921  * (which could, in the worst case, present an SQL injection vulnerability
3922  * if someone were to incautiously load a dump containing objects with
3923  * maliciously crafted names).
3924  *
3925  * The result is a freshly malloc'd string. If the input string is NULL,
3926  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3927  * malloc'ed hyphen.
3928  *
3929  * Note that we currently don't bother to quote names, meaning that the name
3930  * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3931  * it only examines the dumpId field, but someday we might want to try harder.
3932  */
3933 static char *
3934 sanitize_line(const char *str, bool want_hyphen)
3935 {
3936  char *result;
3937  char *s;
3938 
3939  if (!str)
3940  return pg_strdup(want_hyphen ? "-" : "");
3941 
3942  result = pg_strdup(str);
3943 
3944  for (s = result; *s != '\0'; s++)
3945  {
3946  if (*s == '\n' || *s == '\r')
3947  *s = ' ';
3948  }
3949 
3950  return result;
3951 }
3952 
3953 /*
3954  * Write the file header for a custom-format archive
3955  */
3956 void
3958 {
3959  struct tm crtm;
3960 
3961  AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3962  AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3963  AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3964  AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3965  AH->WriteBytePtr(AH, AH->intSize);
3966  AH->WriteBytePtr(AH, AH->offSize);
3967  AH->WriteBytePtr(AH, AH->format);
3969  crtm = *localtime(&AH->createDate);
3970  WriteInt(AH, crtm.tm_sec);
3971  WriteInt(AH, crtm.tm_min);
3972  WriteInt(AH, crtm.tm_hour);
3973  WriteInt(AH, crtm.tm_mday);
3974  WriteInt(AH, crtm.tm_mon);
3975  WriteInt(AH, crtm.tm_year);
3976  WriteInt(AH, crtm.tm_isdst);
3977  WriteStr(AH, PQdb(AH->connection));
3978  WriteStr(AH, AH->public.remoteVersionStr);
3979  WriteStr(AH, PG_VERSION);
3980 }
3981 
3982 void
3984 {
3985  char *errmsg;
3986  char vmaj,
3987  vmin,
3988  vrev;
3989  int fmt;
3990 
3991  /*
3992  * If we haven't already read the header, do so.
3993  *
3994  * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3995  * way to unify the cases?
3996  */
3997  if (!AH->readHeader)
3998  {
3999  char tmpMag[7];
4000 
4001  AH->ReadBufPtr(AH, tmpMag, 5);
4002 
4003  if (strncmp(tmpMag, "PGDMP", 5) != 0)
4004  pg_fatal("did not find magic string in file header");
4005  }
4006 
4007  vmaj = AH->ReadBytePtr(AH);
4008  vmin = AH->ReadBytePtr(AH);
4009 
4010  if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4011  vrev = AH->ReadBytePtr(AH);
4012  else
4013  vrev = 0;
4014 
4015  AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4016 
4017  if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4018  pg_fatal("unsupported version (%d.%d) in file header",
4019  vmaj, vmin);
4020 
4021  AH->intSize = AH->ReadBytePtr(AH);
4022  if (AH->intSize > 32)
4023  pg_fatal("sanity check on integer size (%lu) failed",
4024  (unsigned long) AH->intSize);
4025 
4026  if (AH->intSize > sizeof(int))
4027  pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4028 
4029  if (AH->version >= K_VERS_1_7)
4030  AH->offSize = AH->ReadBytePtr(AH);
4031  else
4032  AH->offSize = AH->intSize;
4033 
4034  fmt = AH->ReadBytePtr(AH);
4035 
4036  if (AH->format != fmt)
4037  pg_fatal("expected format (%d) differs from format found in file (%d)",
4038  AH->format, fmt);
4039 
4040  if (AH->version >= K_VERS_1_15)
4041  AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4042  else if (AH->version >= K_VERS_1_2)
4043  {
4044  /* Guess the compression method based on the level */
4045  if (AH->version < K_VERS_1_4)
4046  AH->compression_spec.level = AH->ReadBytePtr(AH);
4047  else
4048  AH->compression_spec.level = ReadInt(AH);
4049 
4050  if (AH->compression_spec.level != 0)
4052  }
4053  else
4055 
4057  if (errmsg)
4058  {
4059  pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4060  errmsg);
4061  pg_free(errmsg);
4062  }
4063 
4064  if (AH->version >= K_VERS_1_4)
4065  {
4066  struct tm crtm;
4067 
4068  crtm.tm_sec = ReadInt(AH);
4069  crtm.tm_min = ReadInt(AH);
4070  crtm.tm_hour = ReadInt(AH);
4071  crtm.tm_mday = ReadInt(AH);
4072  crtm.tm_mon = ReadInt(AH);
4073  crtm.tm_year = ReadInt(AH);
4074  crtm.tm_isdst = ReadInt(AH);
4075 
4076  /*
4077  * Newer versions of glibc have mktime() report failure if tm_isdst is
4078  * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4079  * TZ=UTC. This is problematic when restoring an archive under a
4080  * different timezone setting. If we get a failure, try again with
4081  * tm_isdst set to -1 ("don't know").
4082  *
4083  * XXX with or without this hack, we reconstruct createDate
4084  * incorrectly when the prevailing timezone is different from
4085  * pg_dump's. Next time we bump the archive version, we should flush
4086  * this representation and store a plain seconds-since-the-Epoch
4087  * timestamp instead.
4088  */
4089  AH->createDate = mktime(&crtm);
4090  if (AH->createDate == (time_t) -1)
4091  {
4092  crtm.tm_isdst = -1;
4093  AH->createDate = mktime(&crtm);
4094  if (AH->createDate == (time_t) -1)
4095  pg_log_warning("invalid creation date in header");
4096  }
4097  }
4098 
4099  if (AH->version >= K_VERS_1_4)
4100  {
4101  AH->archdbname = ReadStr(AH);
4102  }
4103 
4104  if (AH->version >= K_VERS_1_10)
4105  {
4106  AH->archiveRemoteVersion = ReadStr(AH);
4107  AH->archiveDumpVersion = ReadStr(AH);
4108  }
4109 }
4110 
4111 
4112 /*
4113  * checkSeek
4114  * check to see if ftell/fseek can be performed.
4115  */
4116 bool
4117 checkSeek(FILE *fp)
4118 {
4119  pgoff_t tpos;
4120 
4121  /* Check that ftello works on this file */
4122  tpos = ftello(fp);
4123  if (tpos < 0)
4124  return false;
4125 
4126  /*
4127  * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4128  * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4129  * successful no-op even on files that are otherwise unseekable.
4130  */
4131  if (fseeko(fp, tpos, SEEK_SET) != 0)
4132  return false;
4133 
4134  return true;
4135 }
4136 
4137 
4138 /*
4139  * dumpTimestamp
4140  */
4141 static void
4142 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4143 {
4144  char buf[64];
4145 
4146  if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4147  ahprintf(AH, "-- %s %s\n\n", msg, buf);
4148 }
4149 
4150 /*
4151  * Main engine for parallel restore.
4152  *
4153  * Parallel restore is done in three phases. In this first phase,
4154  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4155  * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4156  * PRE_DATA items other than ACLs.) Entries we can't process now are
4157  * added to the pending_list for later phases to deal with.
4158  */
4159 static void
4161 {
4162  bool skipped_some;
4163  TocEntry *next_work_item;
4164 
4165  pg_log_debug("entering restore_toc_entries_prefork");
4166 
4167  /* Adjust dependency information */
4168  fix_dependencies(AH);
4169 
4170  /*
4171  * Do all the early stuff in a single connection in the parent. There's no
4172  * great point in running it in parallel, in fact it will actually run
4173  * faster in a single connection because we avoid all the connection and
4174  * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4175  * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4176  * not risk trying to process them out-of-order.
4177  *
4178  * Stuff that we can't do immediately gets added to the pending_list.
4179  * Note: we don't yet filter out entries that aren't going to be restored.
4180  * They might participate in dependency chains connecting entries that
4181  * should be restored, so we treat them as live until we actually process
4182  * them.
4183  *
4184  * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4185  * before DATA items, and all DATA items before POST_DATA items. That is
4186  * not certain to be true in older archives, though, and in any case use
4187  * of a list file would destroy that ordering (cf. SortTocFromFile). So
4188  * this loop cannot assume that it holds.
4189  */
4191  skipped_some = false;
4192  for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4193  {
4194  bool do_now = true;
4195 
4196  if (next_work_item->section != SECTION_PRE_DATA)
4197  {
4198  /* DATA and POST_DATA items are just ignored for now */
4199  if (next_work_item->section == SECTION_DATA ||
4200  next_work_item->section == SECTION_POST_DATA)
4201  {
4202  do_now = false;
4203  skipped_some = true;
4204  }
4205  else
4206  {
4207  /*
4208  * SECTION_NONE items, such as comments, can be processed now
4209  * if we are still in the PRE_DATA part of the archive. Once
4210  * we've skipped any items, we have to consider whether the
4211  * comment's dependencies are satisfied, so skip it for now.
4212  */
4213  if (skipped_some)
4214  do_now = false;
4215  }
4216  }
4217 
4218  /*
4219  * Also skip items that need to be forced into later passes. We need
4220  * not set skipped_some in this case, since by assumption no main-pass
4221  * items could depend on these.
4222  */
4223  if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4224  do_now = false;
4225 
4226  if (do_now)
4227  {
4228  /* OK, restore the item and update its dependencies */
4229  pg_log_info("processing item %d %s %s",
4230  next_work_item->dumpId,
4231  next_work_item->desc, next_work_item->tag);
4232 
4233  (void) restore_toc_entry(AH, next_work_item, false);
4234 
4235  /* Reduce dependencies, but don't move anything to ready_heap */
4236  reduce_dependencies(AH, next_work_item, NULL);
4237  }
4238  else
4239  {
4240  /* Nope, so add it to pending_list */
4241  pending_list_append(pending_list, next_work_item);
4242  }
4243  }
4244 
4245  /*
4246  * In --transaction-size mode, we must commit the open transaction before
4247  * dropping the database connection. This also ensures that child workers
4248  * can see the objects we've created so far.
4249  */
4250  if (AH->public.ropt->txn_size > 0)
4251  CommitTransaction(&AH->public);
4252 
4253  /*
4254  * Now close parent connection in prep for parallel steps. We do this
4255  * mainly to ensure that we don't exceed the specified number of parallel
4256  * connections.
4257  */
4258  DisconnectDatabase(&AH->public);
4259 
4260  /* blow away any transient state from the old connection */
4261  free(AH->currUser);
4262  AH->currUser = NULL;
4263  free(AH->currSchema);
4264  AH->currSchema = NULL;
4265  free(AH->currTablespace);
4266  AH->currTablespace = NULL;
4267  free(AH->currTableAm);
4268  AH->currTableAm = NULL;
4269 }
4270 
4271 /*
4272  * Main engine for parallel restore.
4273  *
4274  * Parallel restore is done in three phases. In this second phase,
4275  * we process entries by dispatching them to parallel worker children
4276  * (processes on Unix, threads on Windows), each of which connects
4277  * separately to the database. Inter-entry dependencies are respected,
4278  * and so is the RestorePass multi-pass structure. When we can no longer
4279  * make any entries ready to process, we exit. Normally, there will be
4280  * nothing left to do; but if there is, the third phase will mop up.
4281  */
4282 static void
4284  TocEntry *pending_list)
4285 {
4286  binaryheap *ready_heap;
4287  TocEntry *next_work_item;
4288 
4289  pg_log_debug("entering restore_toc_entries_parallel");
4290 
4291  /* Set up ready_heap with enough room for all known TocEntrys */
4292  ready_heap = binaryheap_allocate(AH->tocCount,
4294  NULL);
4295 
4296  /*
4297  * The pending_list contains all items that we need to restore. Move all
4298  * items that are available to process immediately into the ready_heap.
4299  * After this setup, the pending list is everything that needs to be done
4300  * but is blocked by one or more dependencies, while the ready heap
4301  * contains items that have no remaining dependencies and are OK to
4302  * process in the current restore pass.
4303  */
4305  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4306 
4307  /*
4308  * main parent loop
4309  *
4310  * Keep going until there is no worker still running AND there is no work
4311  * left to be done. Note invariant: at top of loop, there should always
4312  * be at least one worker available to dispatch a job to.
4313  */
4314  pg_log_info("entering main parallel loop");
4315 
4316  for (;;)
4317  {
4318  /* Look for an item ready to be dispatched to a worker */
4319  next_work_item = pop_next_work_item(ready_heap, pstate);
4320  if (next_work_item != NULL)
4321  {
4322  /* If not to be restored, don't waste time launching a worker */
4323  if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4324  {
4325  pg_log_info("skipping item %d %s %s",
4326  next_work_item->dumpId,
4327  next_work_item->desc, next_work_item->tag);
4328  /* Update its dependencies as though we'd completed it */
4329  reduce_dependencies(AH, next_work_item, ready_heap);
4330  /* Loop around to see if anything else can be dispatched */
4331  continue;
4332  }
4333 
4334  pg_log_info("launching item %d %s %s",
4335  next_work_item->dumpId,
4336  next_work_item->desc, next_work_item->tag);
4337 
4338  /* Dispatch to some worker */
4339  DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4340  mark_restore_job_done, ready_heap);
4341  }
4342  else if (IsEveryWorkerIdle(pstate))
4343  {
4344  /*
4345  * Nothing is ready and no worker is running, so we're done with
4346  * the current pass or maybe with the whole process.
4347  */
4348  if (AH->restorePass == RESTORE_PASS_LAST)
4349  break; /* No more parallel processing is possible */
4350 
4351  /* Advance to next restore pass */
4352  AH->restorePass++;
4353  /* That probably allows some stuff to be made ready */
4354  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4355  /* Loop around to see if anything's now ready */
4356  continue;
4357  }
4358  else
4359  {
4360  /*
4361  * We have nothing ready, but at least one child is working, so
4362  * wait for some subjob to finish.
4363  */
4364  }
4365 
4366  /*
4367  * Before dispatching another job, check to see if anything has
4368  * finished. We should check every time through the loop so as to
4369  * reduce dependencies as soon as possible. If we were unable to
4370  * dispatch any job this time through, wait until some worker finishes
4371  * (and, hopefully, unblocks some pending item). If we did dispatch
4372  * something, continue as soon as there's at least one idle worker.
4373  * Note that in either case, there's guaranteed to be at least one
4374  * idle worker when we return to the top of the loop. This ensures we
4375  * won't block inside DispatchJobForTocEntry, which would be
4376  * undesirable: we'd rather postpone dispatching until we see what's
4377  * been unblocked by finished jobs.
4378  */
4379  WaitForWorkers(AH, pstate,
4380  next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4381  }
4382 
4383  /* There should now be nothing in ready_heap. */
4384  Assert(binaryheap_empty(ready_heap));
4385 
4386  binaryheap_free(ready_heap);
4387 
4388  pg_log_info("finished main parallel loop");
4389 }
4390 
4391 /*
4392  * Main engine for parallel restore.
4393  *
4394  * Parallel restore is done in three phases. In this third phase,
4395  * we mop up any remaining TOC entries by processing them serially.
4396  * This phase normally should have nothing to do, but if we've somehow
4397  * gotten stuck due to circular dependencies or some such, this provides
4398  * at least some chance of completing the restore successfully.
4399  */
4400 static void
4402 {
4403  RestoreOptions *ropt = AH->public.ropt;
4404  TocEntry *te;
4405 
4406  pg_log_debug("entering restore_toc_entries_postfork");
4407 
4408  /*
4409  * Now reconnect the single parent connection.
4410  */
4411  ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4412 
4413  /* re-establish fixed state */
4415 
4416  /*
4417  * Make sure there is no work left due to, say, circular dependencies, or
4418  * some other pathological condition. If so, do it in the single parent
4419  * connection. We don't sweat about RestorePass ordering; it's likely we
4420  * already violated that.
4421  */
4422  for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4423  {
4424  pg_log_info("processing missed item %d %s %s",
4425  te->dumpId, te->desc, te->tag);
4426  (void) restore_toc_entry(AH, te, false);
4427  }
4428 }
4429 
4430 /*
4431  * Check if te1 has an exclusive lock requirement for an item that te2 also
4432  * requires, whether or not te2's requirement is for an exclusive lock.
4433  */
4434 static bool
4436 {
4437  int j,
4438  k;
4439 
4440  for (j = 0; j < te1->nLockDeps; j++)
4441  {
4442  for (k = 0; k < te2->nDeps; k++)
4443  {
4444  if (te1->lockDeps[j] == te2->dependencies[k])
4445  return true;
4446  }
4447  }
4448  return false;
4449 }
4450 
4451 
4452 /*
4453  * Initialize the header of the pending-items list.
4454  *
4455  * This is a circular list with a dummy TocEntry as header, just like the
4456  * main TOC list; but we use separate list links so that an entry can be in
4457  * the main TOC list as well as in the pending list.
4458  */
4459 static void
4461 {
4462  l->pending_prev = l->pending_next = l;
4463 }
4464 
4465 /* Append te to the end of the pending-list headed by l */
4466 static void
4468 {
4469  te->pending_prev = l->pending_prev;
4470  l->pending_prev->pending_next = te;
4471  l->pending_prev = te;
4472  te->pending_next = l;
4473 }
4474 
4475 /* Remove te from the pending-list */
4476 static void
4478 {
4481  te->pending_prev = NULL;
4482  te->pending_next = NULL;
4483 }
4484 
4485 
4486 /* qsort comparator for sorting TocEntries by dataLength */
4487 static int
4488 TocEntrySizeCompareQsort(const void *p1, const void *p2)
4489 {
4490  const TocEntry *te1 = *(const TocEntry *const *) p1;
4491  const TocEntry *te2 = *(const TocEntry *const *) p2;
4492 
4493  /* Sort by decreasing dataLength */
4494  if (te1->dataLength > te2->dataLength)
4495  return -1;
4496  if (te1->dataLength < te2->dataLength)
4497  return 1;
4498 
4499  /* For equal dataLengths, sort by dumpId, just to be stable */
4500  if (te1->dumpId < te2->dumpId)
4501  return -1;
4502  if (te1->dumpId > te2->dumpId)
4503  return 1;
4504 
4505  return 0;
4506 }
4507 
4508 /* binaryheap comparator for sorting TocEntries by dataLength */
4509 static int
4510 TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4511 {
4512  /* return opposite of qsort comparator for max-heap */
4513  return -TocEntrySizeCompareQsort(&p1, &p2);
4514 }
4515 
4516 
4517 /*
4518  * Move all immediately-ready items from pending_list to ready_heap.
4519  *
4520  * Items are considered ready if they have no remaining dependencies and
4521  * they belong in the current restore pass. (See also reduce_dependencies,
4522  * which applies the same logic one-at-a-time.)
4523  */
4524 static void
4526  binaryheap *ready_heap,
4527  RestorePass pass)
4528 {
4529  TocEntry *te;
4530  TocEntry *next_te;
4531 
4532  for (te = pending_list->pending_next; te != pending_list; te = next_te)
4533  {
4534  /* must save list link before possibly removing te from list */
4535  next_te = te->pending_next;
4536 
4537  if (te->depCount == 0 &&
4538  _tocEntryRestorePass(te) == pass)
4539  {
4540  /* Remove it from pending_list ... */
4541  pending_list_remove(te);
4542  /* ... and add to ready_heap */
4543  binaryheap_add(ready_heap, te);
4544  }
4545  }
4546 }
4547 
4548 /*
4549  * Find the next work item (if any) that is capable of being run now,
4550  * and remove it from the ready_heap.
4551  *
4552  * Returns the item, or NULL if nothing is runnable.
4553  *
4554  * To qualify, the item must have no remaining dependencies
4555  * and no requirements for locks that are incompatible with
4556  * items currently running. Items in the ready_heap are known to have
4557  * no remaining dependencies, but we have to check for lock conflicts.
4558  */
4559 static TocEntry *
4561  ParallelState *pstate)
4562 {
4563  /*
4564  * Search the ready_heap until we find a suitable item. Note that we do a
4565  * sequential scan through the heap nodes, so even though we will first
4566  * try to choose the highest-priority item, we might end up picking
4567  * something with a much lower priority. However, we expect that we will
4568  * typically be able to pick one of the first few items, which should
4569  * usually have a relatively high priority.
4570  */
4571  for (int i = 0; i < binaryheap_size(ready_heap); i++)
4572  {
4573  TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4574  bool conflicts = false;
4575 
4576  /*
4577  * Check to see if the item would need exclusive lock on something
4578  * that a currently running item also needs lock on, or vice versa. If
4579  * so, we don't want to schedule them together.
4580  */
4581  for (int k = 0; k < pstate->numWorkers; k++)
4582  {
4583  TocEntry *running_te = pstate->te[k];
4584 
4585  if (running_te == NULL)
4586  continue;
4587  if (has_lock_conflicts(te, running_te) ||
4588  has_lock_conflicts(running_te, te))
4589  {
4590  conflicts = true;
4591  break;
4592  }
4593  }
4594 
4595  if (conflicts)
4596  continue;
4597 
4598  /* passed all tests, so this item can run */
4599  binaryheap_remove_node(ready_heap, i);
4600  return te;
4601  }
4602 
4603  pg_log_debug("no item ready");
4604  return NULL;
4605 }
4606 
4607 
4608 /*
4609  * Restore a single TOC item in parallel with others
4610  *
4611  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4612  * (everything else). A worker process executes several such work items during
4613  * a parallel backup or restore. Once we terminate here and report back that
4614  * our work is finished, the leader process will assign us a new work item.
4615  */
4616 int
4618 {
4619  int status;
4620 
4621  Assert(AH->connection != NULL);
4622 
4623  /* Count only errors associated with this TOC entry */
4624  AH->public.n_errors = 0;
4625 
4626  /* Restore the TOC item */
4627  status = restore_toc_entry(AH, te, true);
4628 
4629  return status;
4630 }
4631 
4632 
4633 /*
4634  * Callback function that's invoked in the leader process after a step has
4635  * been parallel restored.
4636  *
4637  * Update status and reduce the dependency count of any dependent items.
4638  */
4639 static void
4641  TocEntry *te,
4642  int status,
4643  void *callback_data)
4644 {
4645  binaryheap *ready_heap = (binaryheap *) callback_data;
4646 
4647  pg_log_info("finished item %d %s %s",
4648  te->dumpId, te->desc, te->tag);
4649 
4650  if (status == WORKER_CREATE_DONE)
4651  mark_create_done(AH, te);
4652  else if (status == WORKER_INHIBIT_DATA)
4653  {
4655  AH->public.n_errors++;
4656  }
4657  else if (status == WORKER_IGNORED_ERRORS)
4658  AH->public.n_errors++;
4659  else if (status != 0)
4660  pg_fatal("worker process failed: exit code %d",
4661  status);
4662 
4663  reduce_dependencies(AH, te, ready_heap);
4664 }
4665 
4666 
4667 /*
4668  * Process the dependency information into a form useful for parallel restore.
4669  *
4670  * This function takes care of fixing up some missing or badly designed
4671  * dependencies, and then prepares subsidiary data structures that will be
4672  * used in the main parallel-restore logic, including:
4673  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4674  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4675  * dependencies for each TOC entry.
4676  *
4677  * We also identify locking dependencies so that we can avoid trying to
4678  * schedule conflicting items at the same time.
4679  */
4680 static void
4682 {
4683  TocEntry *te;
4684  int i;
4685 
4686  /*
4687  * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4688  * items are marked as not being in any parallel-processing list.
4689  */
4690  for (te = AH->toc->next; te != AH->toc; te = te->next)
4691  {
4692  te->depCount = te->nDeps;
4693  te->revDeps = NULL;
4694  te->nRevDeps = 0;
4695  te->pending_prev = NULL;
4696  te->pending_next = NULL;
4697  }
4698 
4699  /*
4700  * POST_DATA items that are shown as depending on a table need to be
4701  * re-pointed to depend on that table's data, instead. This ensures they
4702  * won't get scheduled until the data has been loaded.
4703  */
4705 
4706  /*
4707  * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4708  * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4709  * one BLOB COMMENTS in such files.)
4710  */
4711  if (AH->version < K_VERS_1_11)
4712  {
4713  for (te = AH->toc->next; te != AH->toc; te = te->next)
4714  {
4715  if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4716  {
4717  TocEntry *te2;
4718 
4719  for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4720  {
4721  if (strcmp(te2->desc, "BLOBS") == 0)
4722  {
4723  te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4724  te->dependencies[0] = te2->dumpId;
4725  te->nDeps++;
4726  te->depCount++;
4727  break;
4728  }
4729  }
4730  break;
4731  }
4732  }
4733  }
4734 
4735  /*
4736  * At this point we start to build the revDeps reverse-dependency arrays,
4737  * so all changes of dependencies must be complete.
4738  */
4739 
4740  /*
4741  * Count the incoming dependencies for each item. Also, it is possible
4742  * that the dependencies list items that are not in the archive at all
4743  * (that should not happen in 9.2 and later, but is highly likely in older
4744  * archives). Subtract such items from the depCounts.
4745  */
4746  for (te = AH->toc->next; te != AH->toc; te = te->next)
4747  {
4748  for (i = 0; i < te->nDeps; i++)
4749  {
4750  DumpId depid = te->dependencies[i];
4751 
4752  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4753  AH->tocsByDumpId[depid]->nRevDeps++;
4754  else
4755  te->depCount--;
4756  }
4757  }
4758 
4759  /*
4760  * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4761  * it as a counter below.
4762  */
4763  for (te = AH->toc->next; te != AH->toc; te = te->next)
4764  {
4765  if (te->nRevDeps > 0)
4766  te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4767  te->nRevDeps = 0;
4768  }
4769 
4770  /*
4771  * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4772  * better agree with the loops above.
4773  */
4774  for (te = AH->toc->next; te != AH->toc; te = te->next)
4775  {
4776  for (i = 0; i < te->nDeps; i++)
4777  {
4778  DumpId depid = te->dependencies[i];
4779 
4780  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4781  {
4782  TocEntry *otherte = AH->tocsByDumpId[depid];
4783 
4784  otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4785  }
4786  }
4787  }
4788 
4789  /*
4790  * Lastly, work out the locking dependencies.
4791  */
4792  for (te = AH->toc->next; te != AH->toc; te = te->next)
4793  {
4794  te->lockDeps = NULL;
4795  te->nLockDeps = 0;
4797  }
4798 }
4799 
4800 /*
4801  * Change dependencies on table items to depend on table data items instead,
4802  * but only in POST_DATA items.
4803  *
4804  * Also, for any item having such dependency(s), set its dataLength to the
4805  * largest dataLength of the table data items it depends on. This ensures
4806  * that parallel restore will prioritize larger jobs (index builds, FK
4807  * constraint checks, etc) over smaller ones, avoiding situations where we
4808  * end a restore with only one active job working on a large table.
4809  */
4810 static void
4812 {
4813  TocEntry *te;
4814  int i;
4815  DumpId olddep;
4816 
4817  for (te = AH->toc->next; te != AH->toc; te = te->next)
4818  {
4819  if (te->section != SECTION_POST_DATA)
4820  continue;
4821  for (i = 0; i < te->nDeps; i++)
4822  {
4823  olddep = te->dependencies[i];
4824  if (olddep <= AH->maxDumpId &&
4825  AH->tableDataId[olddep] != 0)
4826  {
4827  DumpId tabledataid = AH->tableDataId[olddep];
4828  TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4829 
4830  te->dependencies[i] = tabledataid;
4831  te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4832  pg_log_debug("transferring dependency %d -> %d to %d",
4833  te->dumpId, olddep, tabledataid);
4834  }
4835  }
4836  }
4837 }
4838 
4839 /*
4840  * Identify which objects we'll need exclusive lock on in order to restore
4841  * the given TOC entry (*other* than the one identified by the TOC entry
4842  * itself). Record their dump IDs in the entry's lockDeps[] array.
4843  */
4844 static void
4846 {
4847  DumpId *lockids;
4848  int nlockids;
4849  int i;
4850 
4851  /*
4852  * We only care about this for POST_DATA items. PRE_DATA items are not
4853  * run in parallel, and DATA items are all independent by assumption.
4854  */
4855  if (te->section != SECTION_POST_DATA)
4856  return;
4857 
4858  /* Quick exit if no dependencies at all */
4859  if (te->nDeps == 0)
4860  return;
4861 
4862  /*
4863  * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4864  * and hence require exclusive lock. However, we know that CREATE INDEX
4865  * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4866  * category too ... but today is not that day.)
4867  */
4868  if (strcmp(te->desc, "INDEX") == 0)
4869  return;
4870 
4871  /*
4872  * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4873  * item listed among its dependencies. Originally all of these would have
4874  * been TABLE items, but repoint_table_dependencies would have repointed
4875  * them to the TABLE DATA items if those are present (which they might not
4876  * be, eg in a schema-only dump). Note that all of the entries we are
4877  * processing here are POST_DATA; otherwise there might be a significant
4878  * difference between a dependency on a table and a dependency on its
4879  * data, so that closer analysis would be needed here.
4880  */
4881  lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4882  nlockids = 0;
4883  for (i = 0; i < te->nDeps; i++)
4884  {
4885  DumpId depid = te->dependencies[i];
4886 
4887  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4888  ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4889  strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4890  lockids[nlockids++] = depid;
4891  }
4892 
4893  if (nlockids == 0)
4894  {
4895  free(lockids);
4896  return;
4897  }
4898 
4899  te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4900  te->nLockDeps = nlockids;
4901 }
4902 
4903 /*
4904  * Remove the specified TOC entry from the depCounts of items that depend on
4905  * it, thereby possibly making them ready-to-run. Any pending item that
4906  * becomes ready should be moved to the ready_heap, if that's provided.
4907  */
4908 static void
4910  binaryheap *ready_heap)
4911 {
4912  int i;
4913 
4914  pg_log_debug("reducing dependencies for %d", te->dumpId);
4915 
4916  for (i = 0; i < te->nRevDeps; i++)
4917  {
4918  TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4919 
4920  Assert(otherte->depCount > 0);
4921  otherte->depCount--;
4922 
4923  /*
4924  * It's ready if it has no remaining dependencies, and it belongs in
4925  * the current restore pass, and it is currently a member of the
4926  * pending list (that check is needed to prevent double restore in
4927  * some cases where a list-file forces out-of-order restoring).
4928  * However, if ready_heap == NULL then caller doesn't want any list
4929  * memberships changed.
4930  */
4931  if (otherte->depCount == 0 &&
4932  _tocEntryRestorePass(otherte) == AH->restorePass &&
4933  otherte->pending_prev != NULL &&
4934  ready_heap != NULL)
4935  {
4936  /* Remove it from pending list ... */
4937  pending_list_remove(otherte);
4938  /* ... and add to ready_heap */
4939  binaryheap_add(ready_heap, otherte);
4940  }
4941  }
4942 }
4943 
4944 /*
4945  * Set the created flag on the DATA member corresponding to the given
4946  * TABLE member
4947  */
4948 static void
4950 {
4951  if (AH->tableDataId[te->dumpId] != 0)
4952  {
4953  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4954 
4955  ted->created = true;
4956  }
4957 }
4958 
4959 /*
4960  * Mark the DATA member corresponding to the given TABLE member
4961  * as not wanted
4962  */
4963 static void
4965 {
4966  pg_log_info("table \"%s\" could not be created, will not restore its data",
4967  te->tag);
4968 
4969  if (AH->tableDataId[te->dumpId] != 0)
4970  {
4971  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4972 
4973  ted->reqs = 0;
4974  }
4975 }
4976 
4977 /*
4978  * Clone and de-clone routines used in parallel restoration.
4979  *
4980  * Enough of the structure is cloned to ensure that there is no
4981  * conflict between different threads each with their own clone.
4982  */
4983 ArchiveHandle *
4985 {
4986  ArchiveHandle *clone;
4987 
4988  /* Make a "flat" copy */
4989  clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4990  memcpy(clone, AH, sizeof(ArchiveHandle));
4991 
4992  /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
4993  clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
4994  memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
4995 
4996  /* Handle format-independent fields */
4997  memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4998 
4999  /* The clone will have its own connection, so disregard connection state */
5000  clone->connection = NULL;
5001  clone->connCancel = NULL;
5002  clone->currUser = NULL;
5003  clone->currSchema = NULL;
5004  clone->currTableAm = NULL;
5005  clone->currTablespace = NULL;
5006 
5007  /* savedPassword must be local in case we change it while connecting */
5008  if (clone->savedPassword)
5009  clone->savedPassword = pg_strdup(clone->savedPassword);
5010 
5011  /* clone has its own error count, too */
5012  clone->public.n_errors = 0;
5013 
5014  /* clones should not share lo_buf */
5015  clone->lo_buf = NULL;
5016 
5017  /*
5018  * Clone connections disregard --transaction-size; they must commit after
5019  * each command so that the results are immediately visible to other
5020  * workers.
5021  */
5022  clone->public.ropt->txn_size = 0;
5023 
5024  /*
5025  * Connect our new clone object to the database, using the same connection
5026  * parameters used for the original connection.
5027  */
5028  ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5029 
5030  /* re-establish fixed state */
5031  if (AH->mode == archModeRead)
5032  _doSetFixedOutputState(clone);
5033  /* in write case, setupDumpWorker will fix up connection state */
5034 
5035  /* Let the format-specific code have a chance too */
5036  clone->ClonePtr(clone);
5037 
5038  Assert(clone->connection != NULL);
5039  return clone;
5040 }
5041 
5042 /*
5043  * Release clone-local storage.
5044  *
5045  * Note: we assume any clone-local connection was already closed.
5046  */
5047 void
5049 {
5050  /* Should not have an open database connection */
5051  Assert(AH->connection == NULL);
5052 
5053  /* Clear format-specific state */
5054  AH->DeClonePtr(AH);
5055 
5056  /* Clear state allocated by CloneArchive */
5057  if (AH->sqlparse.curCmd)
5059 
5060  /* Clear any connection-local state */
5061  free(AH->currUser);
5062  free(AH->currSchema);
5063  free(AH->currTablespace);
5064  free(AH->currTableAm);
5065  free(AH->savedPassword);
5066 
5067  free(AH);
5068 }
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1059
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1451
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1205
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1268
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:897
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition: binaryheap.c:225
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:154
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
#define binaryheap_size(h)
Definition: binaryheap.h:66
#define binaryheap_empty(h)
Definition: binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition: binaryheap.h:67
#define PG_BINARY_R
Definition: c.h:1275
#define ngettext(s, p, n)
Definition: c.h:1181
#define PG_BINARY_A
Definition: c.h:1274
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY_W
Definition: c.h:1276
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:289
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:88
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
Definition: compress_io.c:195
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition: compression.c:69
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:1070
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7018
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7182
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:474
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
DataDirSyncMethod
Definition: file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition: file_utils.h:29
const char * str
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:692
char sign
Definition: informix.c:693
static DataDirSyncMethod sync_method
Definition: initdb.c:170
int b
Definition: isn.c:70
return true
Definition: isn.c:126
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
#define INV_WRITE
Definition: libpq-fs.h:21
static void const char * fmt
va_end(args)
va_start(args, fmt)
static struct pg_tm tm
Definition: localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:218
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_PRIMARY
Definition: logging.h:67
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:133
static AmcheckOptions opts
Definition: pg_amcheck.c:111
void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:110
@ SECTION_NONE
Definition: pg_backup.h:57
@ SECTION_POST_DATA
Definition: pg_backup.h:60
@ SECTION_PRE_DATA
Definition: pg_backup.h:58
@ SECTION_DATA
Definition: pg_backup.h:59
int DumpId
Definition: pg_backup.h:270
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:280
enum _archiveFormat ArchiveFormat
@ archModeWrite
Definition: pg_backup.h:51
@ archModeAppend
Definition: pg_backup.h:50
@ archModeRead
Definition: pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:225
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:41
@ archTar
Definition: pg_backup.h:43
@ archCustom
Definition: pg_backup.h:42
@ archDirectory
Definition: pg_backup.h:45
@ archNull
Definition: pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
static void _becomeUser(ArchiveHandle *AH, const char *user)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
RestoreOptions * NewRestoreOptions(void)
static RestorePass _tocEntryRestorePass(TocEntry *te)
int StartLO(Archive *AHX, Oid oid)
static void setupRestoreWorker(Archive *AHX)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
void StartRestoreLOs(ArchiveHandle *AH)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
char * ReadStr(ArchiveHandle *AH)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
DumpOptions * NewDumpOptions(void)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
static char * sanitize_line(const char *str, bool want_hyphen)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void RestoreArchive(Archive *AHX)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define LOBBUFSIZE
void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
Definition: pg_backup_db.c:599
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)