PostgreSQL Source Code  git master
pg_backup_archiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  * Private implementation of the archiver routines.
6  *
7  * See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  * Rights are granted to use this software in any way so long
11  * as this notice is not removed.
12  *
13  * The author is not responsible for loss or damages that may
14  * result from its use.
15  *
16  *
17  * IDENTIFICATION
18  * src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "common/string.h"
34 #include "compress_io.h"
35 #include "dumputils.h"
36 #include "fe_utils/string_utils.h"
37 #include "lib/stringinfo.h"
38 #include "libpq/libpq-fs.h"
39 #include "parallel.h"
40 #include "pg_backup_archiver.h"
41 #include "pg_backup_db.h"
42 #include "pg_backup_utils.h"
43 
44 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
45 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
46 
47 /*
48  * State for tracking TocEntrys that are ready to process during a parallel
49  * restore. (This used to be a list, and we still call it that, though now
50  * it's really an array so that we can apply qsort to it.)
51  *
52  * tes[] is sized large enough that we can't overrun it.
53  * The valid entries are indexed first_te .. last_te inclusive.
54  * We periodically sort the array to bring larger-by-dataLength entries to
55  * the front; "sorted" is true if the valid entries are known sorted.
56  */
57 typedef struct _parallelReadyList
58 {
59  TocEntry **tes; /* Ready-to-dump TocEntrys */
60  int first_te; /* index of first valid entry in tes[] */
61  int last_te; /* index of last valid entry in tes[] */
62  bool sorted; /* are valid entries currently sorted? */
64 
65 
66 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
67  const pg_compress_specification compression_spec,
68  bool dosync, ArchiveMode mode,
69  SetupWorkerPtrType setupWorkerPtr);
70 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
71 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
72 static char *sanitize_line(const char *str, bool want_hyphen);
73 static void _doSetFixedOutputState(ArchiveHandle *AH);
74 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
75 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
76 static void _becomeUser(ArchiveHandle *AH, const char *user);
77 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
78 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
79 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
80 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
81 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
82 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
83 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
84 static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
86 static bool _tocEntryIsACL(TocEntry *te);
89 static bool is_load_via_partition_root(TocEntry *te);
90 static void buildTocEntryArrays(ArchiveHandle *AH);
91 static void _moveBefore(TocEntry *pos, TocEntry *te);
93 
94 static int RestoringToDB(ArchiveHandle *AH);
95 static void dump_lo_buf(ArchiveHandle *AH);
96 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
97 static void SetOutput(ArchiveHandle *AH, const char *filename,
98  const pg_compress_specification compression_spec);
100 static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
101 
102 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
104  TocEntry *pending_list);
106  ParallelState *pstate,
107  TocEntry *pending_list);
109  TocEntry *pending_list);
110 static void pending_list_header_init(TocEntry *l);
111 static void pending_list_append(TocEntry *l, TocEntry *te);
112 static void pending_list_remove(TocEntry *te);
113 static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
114 static void ready_list_free(ParallelReadyList *ready_list);
115 static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
116 static void ready_list_remove(ParallelReadyList *ready_list, int i);
117 static void ready_list_sort(ParallelReadyList *ready_list);
118 static int TocEntrySizeCompare(const void *p1, const void *p2);
119 static void move_to_ready_list(TocEntry *pending_list,
120  ParallelReadyList *ready_list,
121  RestorePass pass);
122 static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
123  ParallelState *pstate);
124 static void mark_dump_job_done(ArchiveHandle *AH,
125  TocEntry *te,
126  int status,
127  void *callback_data);
128 static void mark_restore_job_done(ArchiveHandle *AH,
129  TocEntry *te,
130  int status,
131  void *callback_data);
132 static void fix_dependencies(ArchiveHandle *AH);
133 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
136 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
137  ParallelReadyList *ready_list);
138 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
140 
141 static void StrictNamesCheck(RestoreOptions *ropt);
142 
143 
144 /*
145  * Allocate a new DumpOptions block containing all default values.
146  */
147 DumpOptions *
149 {
151 
153  return opts;
154 }
155 
156 /*
157  * Initialize a DumpOptions struct to all default values
158  */
159 void
161 {
162  memset(opts, 0, sizeof(DumpOptions));
163  /* set any fields that shouldn't default to zeroes */
164  opts->include_everything = true;
165  opts->cparams.promptPassword = TRI_DEFAULT;
166  opts->dumpSections = DUMP_UNSECTIONED;
167 }
168 
169 /*
170  * Create a freshly allocated DumpOptions with options equivalent to those
171  * found in the given RestoreOptions.
172  */
173 DumpOptions *
175 {
176  DumpOptions *dopt = NewDumpOptions();
177 
178  /* this is the inverse of what's at the end of pg_dump.c's main() */
179  dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
180  dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
181  dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
182  dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
184  dopt->outputClean = ropt->dropSchema;
185  dopt->dataOnly = ropt->dataOnly;
186  dopt->schemaOnly = ropt->schemaOnly;
187  dopt->if_exists = ropt->if_exists;
188  dopt->column_inserts = ropt->column_inserts;
189  dopt->dumpSections = ropt->dumpSections;
190  dopt->aclsSkip = ropt->aclsSkip;
191  dopt->outputSuperuser = ropt->superuser;
192  dopt->outputCreateDB = ropt->createDB;
193  dopt->outputNoOwner = ropt->noOwner;
194  dopt->outputNoTableAm = ropt->noTableAm;
195  dopt->outputNoTablespaces = ropt->noTablespace;
196  dopt->disable_triggers = ropt->disable_triggers;
197  dopt->use_setsessauth = ropt->use_setsessauth;
199  dopt->dump_inserts = ropt->dump_inserts;
200  dopt->no_comments = ropt->no_comments;
201  dopt->no_publications = ropt->no_publications;
203  dopt->no_subscriptions = ropt->no_subscriptions;
204  dopt->lockWaitTimeout = ropt->lockWaitTimeout;
207  dopt->sequence_data = ropt->sequence_data;
208 
209  return dopt;
210 }
211 
212 
213 /*
214  * Wrapper functions.
215  *
216  * The objective is to make writing new formats and dumpers as simple
217  * as possible, if necessary at the expense of extra function calls etc.
218  *
219  */
220 
221 /*
222  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
223  * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
224  * setup doesn't need to know anything much, so it's defined here.
225  */
226 static void
228 {
229  ArchiveHandle *AH = (ArchiveHandle *) AHX;
230 
231  AH->ReopenPtr(AH);
232 }
233 
234 
235 /* Create a new archive */
236 /* Public */
237 Archive *
238 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
239  const pg_compress_specification compression_spec,
240  bool dosync, ArchiveMode mode,
242 
243 {
244  ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
246 
247  return (Archive *) AH;
248 }
249 
250 /* Open an existing archive */
251 /* Public */
252 Archive *
253 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
254 {
255  ArchiveHandle *AH;
256  pg_compress_specification compression_spec = {0};
257 
258  compression_spec.algorithm = PG_COMPRESSION_NONE;
259  AH = _allocAH(FileSpec, fmt, compression_spec, true,
261 
262  return (Archive *) AH;
263 }
264 
265 /* Public */
266 void
268 {
269  ArchiveHandle *AH = (ArchiveHandle *) AHX;
270 
271  AH->ClosePtr(AH);
272 
273  /* Close the output */
274  errno = 0;
275  if (!EndCompressFileHandle(AH->OF))
276  pg_fatal("could not close output file: %m");
277 }
278 
279 /* Public */
280 void
282 {
283  /* Caller can omit dump options, in which case we synthesize them */
284  if (dopt == NULL && ropt != NULL)
285  dopt = dumpOptionsFromRestoreOptions(ropt);
286 
287  /* Save options for later access */
288  AH->dopt = dopt;
289  AH->ropt = ropt;
290 }
291 
292 /* Public */
293 void
295 {
296  ArchiveHandle *AH = (ArchiveHandle *) AHX;
297  RestoreOptions *ropt = AH->public.ropt;
298  TocEntry *te;
299  teSection curSection;
300 
301  /* Decide which TOC entries will be dumped/restored, and mark them */
302  curSection = SECTION_PRE_DATA;
303  for (te = AH->toc->next; te != AH->toc; te = te->next)
304  {
305  /*
306  * When writing an archive, we also take this opportunity to check
307  * that we have generated the entries in a sane order that respects
308  * the section divisions. When reading, don't complain, since buggy
309  * old versions of pg_dump might generate out-of-order archives.
310  */
311  if (AH->mode != archModeRead)
312  {
313  switch (te->section)
314  {
315  case SECTION_NONE:
316  /* ok to be anywhere */
317  break;
318  case SECTION_PRE_DATA:
319  if (curSection != SECTION_PRE_DATA)
320  pg_log_warning("archive items not in correct section order");
321  break;
322  case SECTION_DATA:
323  if (curSection == SECTION_POST_DATA)
324  pg_log_warning("archive items not in correct section order");
325  break;
326  case SECTION_POST_DATA:
327  /* ok no matter which section we were in */
328  break;
329  default:
330  pg_fatal("unexpected section code %d",
331  (int) te->section);
332  break;
333  }
334  }
335 
336  if (te->section != SECTION_NONE)
337  curSection = te->section;
338 
339  te->reqs = _tocEntryRequired(te, curSection, AH);
340  }
341 
342  /* Enforce strict names checking */
343  if (ropt->strict_names)
344  StrictNamesCheck(ropt);
345 }
346 
347 /* Public */
348 void
350 {
351  ArchiveHandle *AH = (ArchiveHandle *) AHX;
352  RestoreOptions *ropt = AH->public.ropt;
353  bool parallel_mode;
354  TocEntry *te;
355  CompressFileHandle *sav;
356 
358 
359  /*
360  * If we're going to do parallel restore, there are some restrictions.
361  */
362  parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
363  if (parallel_mode)
364  {
365  /* We haven't got round to making this work for all archive formats */
366  if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
367  pg_fatal("parallel restore is not supported with this archive file format");
368 
369  /* Doesn't work if the archive represents dependencies as OIDs */
370  if (AH->version < K_VERS_1_8)
371  pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
372 
373  /*
374  * It's also not gonna work if we can't reopen the input file, so
375  * let's try that immediately.
376  */
377  AH->ReopenPtr(AH);
378  }
379 
380  /*
381  * Make sure we won't need (de)compression we haven't got
382  */
383  if (AH->PrintTocDataPtr != NULL)
384  {
385  for (te = AH->toc->next; te != AH->toc; te = te->next)
386  {
387  if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
388  {
390  if (errmsg)
391  pg_fatal("cannot restore from compressed archive (%s)",
392  errmsg);
393  else
394  break;
395  }
396  }
397  }
398 
399  /*
400  * Prepare index arrays, so we can assume we have them throughout restore.
401  * It's possible we already did this, though.
402  */
403  if (AH->tocsByDumpId == NULL)
405 
406  /*
407  * If we're using a DB connection, then connect it.
408  */
409  if (ropt->useDB)
410  {
411  pg_log_info("connecting to database for restore");
412  if (AH->version < K_VERS_1_3)
413  pg_fatal("direct database connections are not supported in pre-1.3 archives");
414 
415  /*
416  * We don't want to guess at whether the dump will successfully
417  * restore; allow the attempt regardless of the version of the restore
418  * target.
419  */
420  AHX->minRemoteVersion = 0;
421  AHX->maxRemoteVersion = 9999999;
422 
423  ConnectDatabase(AHX, &ropt->cparams, false);
424 
425  /*
426  * If we're talking to the DB directly, don't send comments since they
427  * obscure SQL when displaying errors
428  */
429  AH->noTocComments = 1;
430  }
431 
432  /*
433  * Work out if we have an implied data-only restore. This can happen if
434  * the dump was data only or if the user has used a toc list to exclude
435  * all of the schema data. All we do is look for schema entries - if none
436  * are found then we set the dataOnly flag.
437  *
438  * We could scan for wanted TABLE entries, but that is not the same as
439  * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
440  */
441  if (!ropt->dataOnly)
442  {
443  int impliedDataOnly = 1;
444 
445  for (te = AH->toc->next; te != AH->toc; te = te->next)
446  {
447  if ((te->reqs & REQ_SCHEMA) != 0)
448  { /* It's schema, and it's wanted */
449  impliedDataOnly = 0;
450  break;
451  }
452  }
453  if (impliedDataOnly)
454  {
455  ropt->dataOnly = impliedDataOnly;
456  pg_log_info("implied data-only restore");
457  }
458  }
459 
460  /*
461  * Setup the output file if necessary.
462  */
463  sav = SaveOutput(AH);
465  SetOutput(AH, ropt->filename, ropt->compression_spec);
466 
467  ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
468 
469  if (AH->archiveRemoteVersion)
470  ahprintf(AH, "-- Dumped from database version %s\n",
472  if (AH->archiveDumpVersion)
473  ahprintf(AH, "-- Dumped by pg_dump version %s\n",
474  AH->archiveDumpVersion);
475 
476  ahprintf(AH, "\n");
477 
478  if (AH->public.verbose)
479  dumpTimestamp(AH, "Started on", AH->createDate);
480 
481  if (ropt->single_txn)
482  {
483  if (AH->connection)
484  StartTransaction(AHX);
485  else
486  ahprintf(AH, "BEGIN;\n\n");
487  }
488 
489  /*
490  * Establish important parameter values right away.
491  */
493 
494  AH->stage = STAGE_PROCESSING;
495 
496  /*
497  * Drop the items at the start, in reverse order
498  */
499  if (ropt->dropSchema)
500  {
501  for (te = AH->toc->prev; te != AH->toc; te = te->prev)
502  {
503  AH->currentTE = te;
504 
505  /*
506  * In createDB mode, issue a DROP *only* for the database as a
507  * whole. Issuing drops against anything else would be wrong,
508  * because at this point we're connected to the wrong database.
509  * (The DATABASE PROPERTIES entry, if any, should be treated like
510  * the DATABASE entry.)
511  */
512  if (ropt->createDB)
513  {
514  if (strcmp(te->desc, "DATABASE") != 0 &&
515  strcmp(te->desc, "DATABASE PROPERTIES") != 0)
516  continue;
517  }
518 
519  /* Otherwise, drop anything that's selected and has a dropStmt */
520  if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
521  {
522  pg_log_info("dropping %s %s", te->desc, te->tag);
523  /* Select owner and schema as necessary */
524  _becomeOwner(AH, te);
525  _selectOutputSchema(AH, te->namespace);
526 
527  /*
528  * Now emit the DROP command, if the object has one. Note we
529  * don't necessarily emit it verbatim; at this point we add an
530  * appropriate IF EXISTS clause, if the user requested it.
531  */
532  if (*te->dropStmt != '\0')
533  {
534  if (!ropt->if_exists ||
535  strncmp(te->dropStmt, "--", 2) == 0)
536  {
537  /*
538  * Without --if-exists, or if it's just a comment (as
539  * happens for the public schema), print the dropStmt
540  * as-is.
541  */
542  ahprintf(AH, "%s", te->dropStmt);
543  }
544  else
545  {
546  /*
547  * Inject an appropriate spelling of "if exists". For
548  * large objects, we have a separate routine that
549  * knows how to do it, without depending on
550  * te->dropStmt; use that. For other objects we need
551  * to parse the command.
552  */
553  if (strncmp(te->desc, "BLOB", 4) == 0)
554  {
555  DropLOIfExists(AH, te->catalogId.oid);
556  }
557  else
558  {
559  char *dropStmt = pg_strdup(te->dropStmt);
560  char *dropStmtOrig = dropStmt;
561  PQExpBuffer ftStmt = createPQExpBuffer();
562 
563  /*
564  * Need to inject IF EXISTS clause after ALTER
565  * TABLE part in ALTER TABLE .. DROP statement
566  */
567  if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
568  {
569  appendPQExpBufferStr(ftStmt,
570  "ALTER TABLE IF EXISTS");
571  dropStmt = dropStmt + 11;
572  }
573 
574  /*
575  * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
576  * not support the IF EXISTS clause, and therefore
577  * we simply emit the original command for DEFAULT
578  * objects (modulo the adjustment made above).
579  *
580  * Likewise, don't mess with DATABASE PROPERTIES.
581  *
582  * If we used CREATE OR REPLACE VIEW as a means of
583  * quasi-dropping an ON SELECT rule, that should
584  * be emitted unchanged as well.
585  *
586  * For other object types, we need to extract the
587  * first part of the DROP which includes the
588  * object type. Most of the time this matches
589  * te->desc, so search for that; however for the
590  * different kinds of CONSTRAINTs, we know to
591  * search for hardcoded "DROP CONSTRAINT" instead.
592  */
593  if (strcmp(te->desc, "DEFAULT") == 0 ||
594  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
595  strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
596  appendPQExpBufferStr(ftStmt, dropStmt);
597  else
598  {
599  char buffer[40];
600  char *mark;
601 
602  if (strcmp(te->desc, "CONSTRAINT") == 0 ||
603  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
604  strcmp(te->desc, "FK CONSTRAINT") == 0)
605  strcpy(buffer, "DROP CONSTRAINT");
606  else
607  snprintf(buffer, sizeof(buffer), "DROP %s",
608  te->desc);
609 
610  mark = strstr(dropStmt, buffer);
611 
612  if (mark)
613  {
614  *mark = '\0';
615  appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
616  dropStmt, buffer,
617  mark + strlen(buffer));
618  }
619  else
620  {
621  /* complain and emit unmodified command */
622  pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
623  dropStmtOrig);
624  appendPQExpBufferStr(ftStmt, dropStmt);
625  }
626  }
627 
628  ahprintf(AH, "%s", ftStmt->data);
629 
630  destroyPQExpBuffer(ftStmt);
631  pg_free(dropStmtOrig);
632  }
633  }
634  }
635  }
636  }
637 
638  /*
639  * _selectOutputSchema may have set currSchema to reflect the effect
640  * of a "SET search_path" command it emitted. However, by now we may
641  * have dropped that schema; or it might not have existed in the first
642  * place. In either case the effective value of search_path will not
643  * be what we think. Forcibly reset currSchema so that we will
644  * re-establish the search_path setting when needed (after creating
645  * the schema).
646  *
647  * If we treated users as pg_dump'able objects then we'd need to reset
648  * currUser here too.
649  */
650  free(AH->currSchema);
651  AH->currSchema = NULL;
652  }
653 
654  if (parallel_mode)
655  {
656  /*
657  * In parallel mode, turn control over to the parallel-restore logic.
658  */
659  ParallelState *pstate;
660  TocEntry pending_list;
661 
662  /* The archive format module may need some setup for this */
663  if (AH->PrepParallelRestorePtr)
664  AH->PrepParallelRestorePtr(AH);
665 
666  pending_list_header_init(&pending_list);
667 
668  /* This runs PRE_DATA items and then disconnects from the database */
669  restore_toc_entries_prefork(AH, &pending_list);
670  Assert(AH->connection == NULL);
671 
672  /* ParallelBackupStart() will actually fork the processes */
673  pstate = ParallelBackupStart(AH);
674  restore_toc_entries_parallel(AH, pstate, &pending_list);
675  ParallelBackupEnd(AH, pstate);
676 
677  /* reconnect the leader and see if we missed something */
678  restore_toc_entries_postfork(AH, &pending_list);
679  Assert(AH->connection != NULL);
680  }
681  else
682  {
683  /*
684  * In serial mode, process everything in three phases: normal items,
685  * then ACLs, then post-ACL items. We might be able to skip one or
686  * both extra phases in some cases, eg data-only restores.
687  */
688  bool haveACL = false;
689  bool havePostACL = false;
690 
691  for (te = AH->toc->next; te != AH->toc; te = te->next)
692  {
693  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
694  continue; /* ignore if not to be dumped at all */
695 
696  switch (_tocEntryRestorePass(te))
697  {
698  case RESTORE_PASS_MAIN:
699  (void) restore_toc_entry(AH, te, false);
700  break;
701  case RESTORE_PASS_ACL:
702  haveACL = true;
703  break;
704  case RESTORE_PASS_POST_ACL:
705  havePostACL = true;
706  break;
707  }
708  }
709 
710  if (haveACL)
711  {
712  for (te = AH->toc->next; te != AH->toc; te = te->next)
713  {
714  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
716  (void) restore_toc_entry(AH, te, false);
717  }
718  }
719 
720  if (havePostACL)
721  {
722  for (te = AH->toc->next; te != AH->toc; te = te->next)
723  {
724  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
725  _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
726  (void) restore_toc_entry(AH, te, false);
727  }
728  }
729  }
730 
731  if (ropt->single_txn)
732  {
733  if (AH->connection)
734  CommitTransaction(AHX);
735  else
736  ahprintf(AH, "COMMIT;\n\n");
737  }
738 
739  if (AH->public.verbose)
740  dumpTimestamp(AH, "Completed on", time(NULL));
741 
742  ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
743 
744  /*
745  * Clean up & we're done.
746  */
747  AH->stage = STAGE_FINALIZING;
748 
750  RestoreOutput(AH, sav);
751 
752  if (ropt->useDB)
754 }
755 
756 /*
757  * Restore a single TOC item. Used in both parallel and non-parallel restore;
758  * is_parallel is true if we are in a worker child process.
759  *
760  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
761  * the parallel parent has to make the corresponding status update.
762  */
763 static int
764 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
765 {
766  RestoreOptions *ropt = AH->public.ropt;
767  int status = WORKER_OK;
768  int reqs;
769  bool defnDumped;
770 
771  AH->currentTE = te;
772 
773  /* Dump any relevant dump warnings to stderr */
774  if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
775  {
776  if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
777  pg_log_warning("warning from original dump file: %s", te->defn);
778  else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
779  pg_log_warning("warning from original dump file: %s", te->copyStmt);
780  }
781 
782  /* Work out what, if anything, we want from this entry */
783  reqs = te->reqs;
784 
785  defnDumped = false;
786 
787  /*
788  * If it has a schema component that we want, then process that
789  */
790  if ((reqs & REQ_SCHEMA) != 0)
791  {
792  /* Show namespace in log message if available */
793  if (te->namespace)
794  pg_log_info("creating %s \"%s.%s\"",
795  te->desc, te->namespace, te->tag);
796  else
797  pg_log_info("creating %s \"%s\"",
798  te->desc, te->tag);
799 
800  _printTocEntry(AH, te, false);
801  defnDumped = true;
802 
803  if (strcmp(te->desc, "TABLE") == 0)
804  {
805  if (AH->lastErrorTE == te)
806  {
807  /*
808  * We failed to create the table. If
809  * --no-data-for-failed-tables was given, mark the
810  * corresponding TABLE DATA to be ignored.
811  *
812  * In the parallel case this must be done in the parent, so we
813  * just set the return value.
814  */
815  if (ropt->noDataForFailedTables)
816  {
817  if (is_parallel)
819  else
821  }
822  }
823  else
824  {
825  /*
826  * We created the table successfully. Mark the corresponding
827  * TABLE DATA for possible truncation.
828  *
829  * In the parallel case this must be done in the parent, so we
830  * just set the return value.
831  */
832  if (is_parallel)
834  else
835  mark_create_done(AH, te);
836  }
837  }
838 
839  /*
840  * If we created a DB, connect to it. Also, if we changed DB
841  * properties, reconnect to ensure that relevant GUC settings are
842  * applied to our session.
843  */
844  if (strcmp(te->desc, "DATABASE") == 0 ||
845  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
846  {
847  pg_log_info("connecting to new database \"%s\"", te->tag);
848  _reconnectToDB(AH, te->tag);
849  }
850  }
851 
852  /*
853  * If it has a data component that we want, then process that
854  */
855  if ((reqs & REQ_DATA) != 0)
856  {
857  /*
858  * hadDumper will be set if there is genuine data component for this
859  * node. Otherwise, we need to check the defn field for statements
860  * that need to be executed in data-only restores.
861  */
862  if (te->hadDumper)
863  {
864  /*
865  * If we can output the data, then restore it.
866  */
867  if (AH->PrintTocDataPtr != NULL)
868  {
869  _printTocEntry(AH, te, true);
870 
871  if (strcmp(te->desc, "BLOBS") == 0 ||
872  strcmp(te->desc, "BLOB COMMENTS") == 0)
873  {
874  pg_log_info("processing %s", te->desc);
875 
876  _selectOutputSchema(AH, "pg_catalog");
877 
878  /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
879  if (strcmp(te->desc, "BLOB COMMENTS") == 0)
881 
882  AH->PrintTocDataPtr(AH, te);
883 
885  }
886  else
887  {
888  bool use_truncate;
889 
891 
892  /* Select owner and schema as necessary */
893  _becomeOwner(AH, te);
894  _selectOutputSchema(AH, te->namespace);
895 
896  pg_log_info("processing data for table \"%s.%s\"",
897  te->namespace, te->tag);
898 
899  /*
900  * In parallel restore, if we created the table earlier in
901  * this run (so that we know it is empty) and we are not
902  * restoring a load-via-partition-root data item then we
903  * wrap the COPY in a transaction and precede it with a
904  * TRUNCATE. If wal_level is set to minimal this prevents
905  * WAL-logging the COPY. This obtains a speedup similar
906  * to that from using single_txn mode in non-parallel
907  * restores.
908  *
909  * We mustn't do this for load-via-partition-root cases
910  * because some data might get moved across partition
911  * boundaries, risking deadlock and/or loss of previously
912  * loaded data. (We assume that all partitions of a
913  * partitioned table will be treated the same way.)
914  */
915  use_truncate = is_parallel && te->created &&
917 
918  if (use_truncate)
919  {
920  /*
921  * Parallel restore is always talking directly to a
922  * server, so no need to see if we should issue BEGIN.
923  */
924  StartTransaction(&AH->public);
925 
926  /*
927  * Issue TRUNCATE with ONLY so that child tables are
928  * not wiped.
929  */
930  ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
931  fmtQualifiedId(te->namespace, te->tag));
932  }
933 
934  /*
935  * If we have a copy statement, use it.
936  */
937  if (te->copyStmt && strlen(te->copyStmt) > 0)
938  {
939  ahprintf(AH, "%s", te->copyStmt);
941  }
942  else
944 
945  AH->PrintTocDataPtr(AH, te);
946 
947  /*
948  * Terminate COPY if needed.
949  */
950  if (AH->outputKind == OUTPUT_COPYDATA &&
951  RestoringToDB(AH))
952  EndDBCopyMode(&AH->public, te->tag);
954 
955  /* close out the transaction started above */
956  if (use_truncate)
958 
960  }
961  }
962  }
963  else if (!defnDumped)
964  {
965  /* If we haven't already dumped the defn part, do so now */
966  pg_log_info("executing %s %s", te->desc, te->tag);
967  _printTocEntry(AH, te, false);
968  }
969  }
970 
971  if (AH->public.n_errors > 0 && status == WORKER_OK)
973 
974  return status;
975 }
976 
977 /*
978  * Allocate a new RestoreOptions block.
979  * This is mainly so we can initialize it, but also for future expansion,
980  */
983 {
985 
987 
988  /* set any fields that shouldn't default to zeroes */
989  opts->format = archUnknown;
990  opts->cparams.promptPassword = TRI_DEFAULT;
991  opts->dumpSections = DUMP_UNSECTIONED;
992  opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
993  opts->compression_spec.level = 0;
994 
995  return opts;
996 }
997 
998 static void
1000 {
1001  RestoreOptions *ropt = AH->public.ropt;
1002 
1003  /* This hack is only needed in a data-only restore */
1004  if (!ropt->dataOnly || !ropt->disable_triggers)
1005  return;
1006 
1007  pg_log_info("disabling triggers for %s", te->tag);
1008 
1009  /*
1010  * Become superuser if possible, since they are the only ones who can
1011  * disable constraint triggers. If -S was not given, assume the initial
1012  * user identity is a superuser. (XXX would it be better to become the
1013  * table owner?)
1014  */
1015  _becomeUser(AH, ropt->superuser);
1016 
1017  /*
1018  * Disable them.
1019  */
1020  ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1021  fmtQualifiedId(te->namespace, te->tag));
1022 }
1023 
1024 static void
1026 {
1027  RestoreOptions *ropt = AH->public.ropt;
1028 
1029  /* This hack is only needed in a data-only restore */
1030  if (!ropt->dataOnly || !ropt->disable_triggers)
1031  return;
1032 
1033  pg_log_info("enabling triggers for %s", te->tag);
1034 
1035  /*
1036  * Become superuser if possible, since they are the only ones who can
1037  * disable constraint triggers. If -S was not given, assume the initial
1038  * user identity is a superuser. (XXX would it be better to become the
1039  * table owner?)
1040  */
1041  _becomeUser(AH, ropt->superuser);
1042 
1043  /*
1044  * Enable them.
1045  */
1046  ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1047  fmtQualifiedId(te->namespace, te->tag));
1048 }
1049 
1050 /*
1051  * Detect whether a TABLE DATA TOC item is performing "load via partition
1052  * root", that is the target table is an ancestor partition rather than the
1053  * table the TOC item is nominally for.
1054  *
1055  * In newer archive files this can be detected by checking for a special
1056  * comment placed in te->defn. In older files we have to fall back to seeing
1057  * if the COPY statement targets the named table or some other one. This
1058  * will not work for data dumped as INSERT commands, so we could give a false
1059  * negative in that case; fortunately, that's a rarely-used option.
1060  */
1061 static bool
1063 {
1064  if (te->defn &&
1065  strncmp(te->defn, "-- load via partition root ", 27) == 0)
1066  return true;
1067  if (te->copyStmt && *te->copyStmt)
1068  {
1069  PQExpBuffer copyStmt = createPQExpBuffer();
1070  bool result;
1071 
1072  /*
1073  * Build the initial part of the COPY as it would appear if the
1074  * nominal target table is the actual target. If we see anything
1075  * else, it must be a load-via-partition-root case.
1076  */
1077  appendPQExpBuffer(copyStmt, "COPY %s ",
1078  fmtQualifiedId(te->namespace, te->tag));
1079  result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1080  destroyPQExpBuffer(copyStmt);
1081  return result;
1082  }
1083  /* Assume it's not load-via-partition-root */
1084  return false;
1085 }
1086 
1087 /*
1088  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1089  */
1090 
1091 /* Public */
1092 void
1093 WriteData(Archive *AHX, const void *data, size_t dLen)
1094 {
1095  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1096 
1097  if (!AH->currToc)
1098  pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1099 
1100  AH->WriteDataPtr(AH, data, dLen);
1101 }
1102 
1103 /*
1104  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1105  * repository for all metadata. But the name has stuck.
1106  *
1107  * The new entry is added to the Archive's TOC list. Most callers can ignore
1108  * the result value because nothing else need be done, but a few want to
1109  * manipulate the TOC entry further.
1110  */
1111 
1112 /* Public */
1113 TocEntry *
1114 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1115  ArchiveOpts *opts)
1116 {
1117  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1118  TocEntry *newToc;
1119 
1120  newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1121 
1122  AH->tocCount++;
1123  if (dumpId > AH->maxDumpId)
1124  AH->maxDumpId = dumpId;
1125 
1126  newToc->prev = AH->toc->prev;
1127  newToc->next = AH->toc;
1128  AH->toc->prev->next = newToc;
1129  AH->toc->prev = newToc;
1130 
1131  newToc->catalogId = catalogId;
1132  newToc->dumpId = dumpId;
1133  newToc->section = opts->section;
1134 
1135  newToc->tag = pg_strdup(opts->tag);
1136  newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1137  newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1138  newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1139  newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1140  newToc->desc = pg_strdup(opts->description);
1141  newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1142  newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1143  newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1144 
1145  if (opts->nDeps > 0)
1146  {
1147  newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1148  memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1149  newToc->nDeps = opts->nDeps;
1150  }
1151  else
1152  {
1153  newToc->dependencies = NULL;
1154  newToc->nDeps = 0;
1155  }
1156 
1157  newToc->dataDumper = opts->dumpFn;
1158  newToc->dataDumperArg = opts->dumpArg;
1159  newToc->hadDumper = opts->dumpFn ? true : false;
1160 
1161  newToc->formatData = NULL;
1162  newToc->dataLength = 0;
1163 
1164  if (AH->ArchiveEntryPtr != NULL)
1165  AH->ArchiveEntryPtr(AH, newToc);
1166 
1167  return newToc;
1168 }
1169 
1170 /* Public */
1171 void
1173 {
1174  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1175  RestoreOptions *ropt = AH->public.ropt;
1176  TocEntry *te;
1177  pg_compress_specification out_compression_spec = {0};
1178  teSection curSection;
1179  CompressFileHandle *sav;
1180  const char *fmtName;
1181  char stamp_str[64];
1182 
1183  /* TOC is always uncompressed */
1184  out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1185 
1186  sav = SaveOutput(AH);
1187  if (ropt->filename)
1188  SetOutput(AH, ropt->filename, out_compression_spec);
1189 
1190  if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1191  localtime(&AH->createDate)) == 0)
1192  strcpy(stamp_str, "[unknown]");
1193 
1194  ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1195  ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1196  sanitize_line(AH->archdbname, false),
1197  AH->tocCount,
1199 
1200  switch (AH->format)
1201  {
1202  case archCustom:
1203  fmtName = "CUSTOM";
1204  break;
1205  case archDirectory:
1206  fmtName = "DIRECTORY";
1207  break;
1208  case archTar:
1209  fmtName = "TAR";
1210  break;
1211  default:
1212  fmtName = "UNKNOWN";
1213  }
1214 
1215  ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1217  ahprintf(AH, "; Format: %s\n", fmtName);
1218  ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1219  ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1220  if (AH->archiveRemoteVersion)
1221  ahprintf(AH, "; Dumped from database version: %s\n",
1222  AH->archiveRemoteVersion);
1223  if (AH->archiveDumpVersion)
1224  ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1225  AH->archiveDumpVersion);
1226 
1227  ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1228 
1229  curSection = SECTION_PRE_DATA;
1230  for (te = AH->toc->next; te != AH->toc; te = te->next)
1231  {
1232  if (te->section != SECTION_NONE)
1233  curSection = te->section;
1234  if (ropt->verbose ||
1235  (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1236  {
1237  char *sanitized_name;
1238  char *sanitized_schema;
1239  char *sanitized_owner;
1240 
1241  /*
1242  */
1243  sanitized_name = sanitize_line(te->tag, false);
1244  sanitized_schema = sanitize_line(te->namespace, true);
1245  sanitized_owner = sanitize_line(te->owner, false);
1246 
1247  ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1248  te->catalogId.tableoid, te->catalogId.oid,
1249  te->desc, sanitized_schema, sanitized_name,
1250  sanitized_owner);
1251 
1252  free(sanitized_name);
1253  free(sanitized_schema);
1254  free(sanitized_owner);
1255  }
1256  if (ropt->verbose && te->nDeps > 0)
1257  {
1258  int i;
1259 
1260  ahprintf(AH, ";\tdepends on:");
1261  for (i = 0; i < te->nDeps; i++)
1262  ahprintf(AH, " %d", te->dependencies[i]);
1263  ahprintf(AH, "\n");
1264  }
1265  }
1266 
1267  /* Enforce strict names checking */
1268  if (ropt->strict_names)
1269  StrictNamesCheck(ropt);
1270 
1271  if (ropt->filename)
1272  RestoreOutput(AH, sav);
1273 }
1274 
1275 /***********
1276  * Large Object Archival
1277  ***********/
1278 
1279 /* Called by a dumper to signal start of a LO */
1280 int
1281 StartLO(Archive *AHX, Oid oid)
1282 {
1283  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1284 
1285  if (!AH->StartLOPtr)
1286  pg_fatal("large-object output not supported in chosen format");
1287 
1288  AH->StartLOPtr(AH, AH->currToc, oid);
1289 
1290  return 1;
1291 }
1292 
1293 /* Called by a dumper to signal end of a LO */
1294 int
1295 EndLO(Archive *AHX, Oid oid)
1296 {
1297  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1298 
1299  if (AH->EndLOPtr)
1300  AH->EndLOPtr(AH, AH->currToc, oid);
1301 
1302  return 1;
1303 }
1304 
1305 /**********
1306  * Large Object Restoration
1307  **********/
1308 
1309 /*
1310  * Called by a format handler before any LOs are restored
1311  */
1312 void
1314 {
1315  RestoreOptions *ropt = AH->public.ropt;
1316 
1317  if (!ropt->single_txn)
1318  {
1319  if (AH->connection)
1320  StartTransaction(&AH->public);
1321  else
1322  ahprintf(AH, "BEGIN;\n\n");
1323  }
1324 
1325  AH->loCount = 0;
1326 }
1327 
1328 /*
1329  * Called by a format handler after all LOs are restored
1330  */
1331 void
1333 {
1334  RestoreOptions *ropt = AH->public.ropt;
1335 
1336  if (!ropt->single_txn)
1337  {
1338  if (AH->connection)
1339  CommitTransaction(&AH->public);
1340  else
1341  ahprintf(AH, "COMMIT;\n\n");
1342  }
1343 
1344  pg_log_info(ngettext("restored %d large object",
1345  "restored %d large objects",
1346  AH->loCount),
1347  AH->loCount);
1348 }
1349 
1350 
1351 /*
1352  * Called by a format handler to initiate restoration of a LO
1353  */
1354 void
1355 StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1356 {
1357  bool old_lo_style = (AH->version < K_VERS_1_12);
1358  Oid loOid;
1359 
1360  AH->loCount++;
1361 
1362  /* Initialize the LO Buffer */
1363  AH->lo_buf_used = 0;
1364 
1365  pg_log_info("restoring large object with OID %u", oid);
1366 
1367  /* With an old archive we must do drop and create logic here */
1368  if (old_lo_style && drop)
1369  DropLOIfExists(AH, oid);
1370 
1371  if (AH->connection)
1372  {
1373  if (old_lo_style)
1374  {
1375  loOid = lo_create(AH->connection, oid);
1376  if (loOid == 0 || loOid != oid)
1377  pg_fatal("could not create large object %u: %s",
1378  oid, PQerrorMessage(AH->connection));
1379  }
1380  AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1381  if (AH->loFd == -1)
1382  pg_fatal("could not open large object %u: %s",
1383  oid, PQerrorMessage(AH->connection));
1384  }
1385  else
1386  {
1387  if (old_lo_style)
1388  ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1389  oid, INV_WRITE);
1390  else
1391  ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1392  oid, INV_WRITE);
1393  }
1394 
1395  AH->writingLO = true;
1396 }
1397 
1398 void
1400 {
1401  if (AH->lo_buf_used > 0)
1402  {
1403  /* Write remaining bytes from the LO buffer */
1404  dump_lo_buf(AH);
1405  }
1406 
1407  AH->writingLO = false;
1408 
1409  if (AH->connection)
1410  {
1411  lo_close(AH->connection, AH->loFd);
1412  AH->loFd = -1;
1413  }
1414  else
1415  {
1416  ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1417  }
1418 }
1419 
1420 /***********
1421  * Sorting and Reordering
1422  ***********/
1423 
1424 void
1426 {
1427  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1428  RestoreOptions *ropt = AH->public.ropt;
1429  FILE *fh;
1430  StringInfoData linebuf;
1431 
1432  /* Allocate space for the 'wanted' array, and init it */
1433  ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1434 
1435  /* Setup the file */
1436  fh = fopen(ropt->tocFile, PG_BINARY_R);
1437  if (!fh)
1438  pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1439 
1440  initStringInfo(&linebuf);
1441 
1442  while (pg_get_line_buf(fh, &linebuf))
1443  {
1444  char *cmnt;
1445  char *endptr;
1446  DumpId id;
1447  TocEntry *te;
1448 
1449  /* Truncate line at comment, if any */
1450  cmnt = strchr(linebuf.data, ';');
1451  if (cmnt != NULL)
1452  {
1453  cmnt[0] = '\0';
1454  linebuf.len = cmnt - linebuf.data;
1455  }
1456 
1457  /* Ignore if all blank */
1458  if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1459  continue;
1460 
1461  /* Get an ID, check it's valid and not already seen */
1462  id = strtol(linebuf.data, &endptr, 10);
1463  if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1464  ropt->idWanted[id - 1])
1465  {
1466  pg_log_warning("line ignored: %s", linebuf.data);
1467  continue;
1468  }
1469 
1470  /* Find TOC entry */
1471  te = getTocEntryByDumpId(AH, id);
1472  if (!te)
1473  pg_fatal("could not find entry for ID %d",
1474  id);
1475 
1476  /* Mark it wanted */
1477  ropt->idWanted[id - 1] = true;
1478 
1479  /*
1480  * Move each item to the end of the list as it is selected, so that
1481  * they are placed in the desired order. Any unwanted items will end
1482  * up at the front of the list, which may seem unintuitive but it's
1483  * what we need. In an ordinary serial restore that makes no
1484  * difference, but in a parallel restore we need to mark unrestored
1485  * items' dependencies as satisfied before we start examining
1486  * restorable items. Otherwise they could have surprising
1487  * side-effects on the order in which restorable items actually get
1488  * restored.
1489  */
1490  _moveBefore(AH->toc, te);
1491  }
1492 
1493  pg_free(linebuf.data);
1494 
1495  if (fclose(fh) != 0)
1496  pg_fatal("could not close TOC file: %m");
1497 }
1498 
1499 /**********************
1500  * Convenience functions that look like standard IO functions
1501  * for writing data when in dump mode.
1502  **********************/
1503 
1504 /* Public */
1505 void
1506 archputs(const char *s, Archive *AH)
1507 {
1508  WriteData(AH, s, strlen(s));
1509 }
1510 
1511 /* Public */
1512 int
1513 archprintf(Archive *AH, const char *fmt,...)
1514 {
1515  int save_errno = errno;
1516  char *p;
1517  size_t len = 128; /* initial assumption about buffer size */
1518  size_t cnt;
1519 
1520  for (;;)
1521  {
1522  va_list args;
1523 
1524  /* Allocate work buffer. */
1525  p = (char *) pg_malloc(len);
1526 
1527  /* Try to format the data. */
1528  errno = save_errno;
1529  va_start(args, fmt);
1530  cnt = pvsnprintf(p, len, fmt, args);
1531  va_end(args);
1532 
1533  if (cnt < len)
1534  break; /* success */
1535 
1536  /* Release buffer and loop around to try again with larger len. */
1537  free(p);
1538  len = cnt;
1539  }
1540 
1541  WriteData(AH, p, cnt);
1542  free(p);
1543  return (int) cnt;
1544 }
1545 
1546 
1547 /*******************************
1548  * Stuff below here should be 'private' to the archiver routines
1549  *******************************/
1550 
1551 static void
1553  const pg_compress_specification compression_spec)
1554 {
1555  CompressFileHandle *CFH;
1556  const char *mode;
1557  int fn = -1;
1558 
1559  if (filename)
1560  {
1561  if (strcmp(filename, "-") == 0)
1562  fn = fileno(stdout);
1563  }
1564  else if (AH->FH)
1565  fn = fileno(AH->FH);
1566  else if (AH->fSpec)
1567  {
1568  filename = AH->fSpec;
1569  }
1570  else
1571  fn = fileno(stdout);
1572 
1573  if (AH->mode == archModeAppend)
1574  mode = PG_BINARY_A;
1575  else
1576  mode = PG_BINARY_W;
1577 
1578  CFH = InitCompressFileHandle(compression_spec);
1579 
1580  if (!CFH->open_func(filename, fn, mode, CFH))
1581  {
1582  if (filename)
1583  pg_fatal("could not open output file \"%s\": %m", filename);
1584  else
1585  pg_fatal("could not open output file: %m");
1586  }
1587 
1588  AH->OF = CFH;
1589 }
1590 
1591 static CompressFileHandle *
1593 {
1594  return (CompressFileHandle *) AH->OF;
1595 }
1596 
1597 static void
1599 {
1600  errno = 0;
1601  if (!EndCompressFileHandle(AH->OF))
1602  pg_fatal("could not close output file: %m");
1603 
1604  AH->OF = savedOutput;
1605 }
1606 
1607 
1608 
1609 /*
1610  * Print formatted text to the output file (usually stdout).
1611  */
1612 int
1613 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1614 {
1615  int save_errno = errno;
1616  char *p;
1617  size_t len = 128; /* initial assumption about buffer size */
1618  size_t cnt;
1619 
1620  for (;;)
1621  {
1622  va_list args;
1623 
1624  /* Allocate work buffer. */
1625  p = (char *) pg_malloc(len);
1626 
1627  /* Try to format the data. */
1628  errno = save_errno;
1629  va_start(args, fmt);
1630  cnt = pvsnprintf(p, len, fmt, args);
1631  va_end(args);
1632 
1633  if (cnt < len)
1634  break; /* success */
1635 
1636  /* Release buffer and loop around to try again with larger len. */
1637  free(p);
1638  len = cnt;
1639  }
1640 
1641  ahwrite(p, 1, cnt, AH);
1642  free(p);
1643  return (int) cnt;
1644 }
1645 
1646 /*
1647  * Single place for logic which says 'We are restoring to a direct DB connection'.
1648  */
1649 static int
1651 {
1652  RestoreOptions *ropt = AH->public.ropt;
1653 
1654  return (ropt && ropt->useDB && AH->connection);
1655 }
1656 
1657 /*
1658  * Dump the current contents of the LO data buffer while writing a LO
1659  */
1660 static void
1662 {
1663  if (AH->connection)
1664  {
1665  int res;
1666 
1667  res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1668  pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1669  "wrote %zu bytes of large object data (result = %d)",
1670  AH->lo_buf_used),
1671  AH->lo_buf_used, res);
1672  /* We assume there are no short writes, only errors */
1673  if (res != AH->lo_buf_used)
1674  warn_or_exit_horribly(AH, "could not write to large object: %s",
1675  PQerrorMessage(AH->connection));
1676  }
1677  else
1678  {
1680 
1682  (const unsigned char *) AH->lo_buf,
1683  AH->lo_buf_used,
1684  AH);
1685 
1686  /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1687  AH->writingLO = false;
1688  ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1689  AH->writingLO = true;
1690 
1692  }
1693  AH->lo_buf_used = 0;
1694 }
1695 
1696 
1697 /*
1698  * Write buffer to the output file (usually stdout). This is used for
1699  * outputting 'restore' scripts etc. It is even possible for an archive
1700  * format to create a custom output routine to 'fake' a restore if it
1701  * wants to generate a script (see TAR output).
1702  */
1703 void
1704 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1705 {
1706  int bytes_written = 0;
1707 
1708  if (AH->writingLO)
1709  {
1710  size_t remaining = size * nmemb;
1711 
1712  while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1713  {
1714  size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1715 
1716  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1717  ptr = (const void *) ((const char *) ptr + avail);
1718  remaining -= avail;
1719  AH->lo_buf_used += avail;
1720  dump_lo_buf(AH);
1721  }
1722 
1723  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1724  AH->lo_buf_used += remaining;
1725 
1726  bytes_written = size * nmemb;
1727  }
1728  else if (AH->CustomOutPtr)
1729  bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1730 
1731  /*
1732  * If we're doing a restore, and it's direct to DB, and we're connected
1733  * then send it to the DB.
1734  */
1735  else if (RestoringToDB(AH))
1736  bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1737  else
1738  {
1739  CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1740 
1741  if (CFH->write_func(ptr, size * nmemb, CFH))
1742  bytes_written = size * nmemb;
1743  }
1744 
1745  if (bytes_written != size * nmemb)
1747 }
1748 
1749 /* on some error, we may decide to go on... */
1750 void
1752 {
1753  va_list ap;
1754 
1755  switch (AH->stage)
1756  {
1757 
1758  case STAGE_NONE:
1759  /* Do nothing special */
1760  break;
1761 
1762  case STAGE_INITIALIZING:
1763  if (AH->stage != AH->lastErrorStage)
1764  pg_log_info("while INITIALIZING:");
1765  break;
1766 
1767  case STAGE_PROCESSING:
1768  if (AH->stage != AH->lastErrorStage)
1769  pg_log_info("while PROCESSING TOC:");
1770  break;
1771 
1772  case STAGE_FINALIZING:
1773  if (AH->stage != AH->lastErrorStage)
1774  pg_log_info("while FINALIZING:");
1775  break;
1776  }
1777  if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1778  {
1779  pg_log_info("from TOC entry %d; %u %u %s %s %s",
1780  AH->currentTE->dumpId,
1782  AH->currentTE->catalogId.oid,
1783  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1784  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1785  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1786  }
1787  AH->lastErrorStage = AH->stage;
1788  AH->lastErrorTE = AH->currentTE;
1789 
1790  va_start(ap, fmt);
1792  va_end(ap);
1793 
1794  if (AH->public.exit_on_error)
1795  exit_nicely(1);
1796  else
1797  AH->public.n_errors++;
1798 }
1799 
1800 #ifdef NOT_USED
1801 
1802 static void
1803 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1804 {
1805  /* Unlink te from list */
1806  te->prev->next = te->next;
1807  te->next->prev = te->prev;
1808 
1809  /* and insert it after "pos" */
1810  te->prev = pos;
1811  te->next = pos->next;
1812  pos->next->prev = te;
1813  pos->next = te;
1814 }
1815 #endif
1816 
1817 static void
1819 {
1820  /* Unlink te from list */
1821  te->prev->next = te->next;
1822  te->next->prev = te->prev;
1823 
1824  /* and insert it before "pos" */
1825  te->prev = pos->prev;
1826  te->next = pos;
1827  pos->prev->next = te;
1828  pos->prev = te;
1829 }
1830 
1831 /*
1832  * Build index arrays for the TOC list
1833  *
1834  * This should be invoked only after we have created or read in all the TOC
1835  * items.
1836  *
1837  * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1838  * array entries run only up to maxDumpId. We might see dependency dump IDs
1839  * beyond that (if the dump was partial); so always check the array bound
1840  * before trying to touch an array entry.
1841  */
1842 static void
1844 {
1845  DumpId maxDumpId = AH->maxDumpId;
1846  TocEntry *te;
1847 
1848  AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1849  AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1850 
1851  for (te = AH->toc->next; te != AH->toc; te = te->next)
1852  {
1853  /* this check is purely paranoia, maxDumpId should be correct */
1854  if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1855  pg_fatal("bad dumpId");
1856 
1857  /* tocsByDumpId indexes all TOCs by their dump ID */
1858  AH->tocsByDumpId[te->dumpId] = te;
1859 
1860  /*
1861  * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1862  * TOC entry that has a DATA item. We compute this by reversing the
1863  * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1864  * just one dependency and it is the TABLE item.
1865  */
1866  if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1867  {
1868  DumpId tableId = te->dependencies[0];
1869 
1870  /*
1871  * The TABLE item might not have been in the archive, if this was
1872  * a data-only dump; but its dump ID should be less than its data
1873  * item's dump ID, so there should be a place for it in the array.
1874  */
1875  if (tableId <= 0 || tableId > maxDumpId)
1876  pg_fatal("bad table dumpId for TABLE DATA item");
1877 
1878  AH->tableDataId[tableId] = te->dumpId;
1879  }
1880  }
1881 }
1882 
1883 TocEntry *
1885 {
1886  /* build index arrays if we didn't already */
1887  if (AH->tocsByDumpId == NULL)
1888  buildTocEntryArrays(AH);
1889 
1890  if (id > 0 && id <= AH->maxDumpId)
1891  return AH->tocsByDumpId[id];
1892 
1893  return NULL;
1894 }
1895 
1896 int
1898 {
1899  TocEntry *te = getTocEntryByDumpId(AH, id);
1900 
1901  if (!te)
1902  return 0;
1903 
1904  return te->reqs;
1905 }
1906 
1907 size_t
1908 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1909 {
1910  int off;
1911 
1912  /* Save the flag */
1913  AH->WriteBytePtr(AH, wasSet);
1914 
1915  /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1916  for (off = 0; off < sizeof(pgoff_t); off++)
1917  {
1918  AH->WriteBytePtr(AH, o & 0xFF);
1919  o >>= 8;
1920  }
1921  return sizeof(pgoff_t) + 1;
1922 }
1923 
1924 int
1926 {
1927  int i;
1928  int off;
1929  int offsetFlg;
1930 
1931  /* Initialize to zero */
1932  *o = 0;
1933 
1934  /* Check for old version */
1935  if (AH->version < K_VERS_1_7)
1936  {
1937  /* Prior versions wrote offsets using WriteInt */
1938  i = ReadInt(AH);
1939  /* -1 means not set */
1940  if (i < 0)
1941  return K_OFFSET_POS_NOT_SET;
1942  else if (i == 0)
1943  return K_OFFSET_NO_DATA;
1944 
1945  /* Cast to pgoff_t because it was written as an int. */
1946  *o = (pgoff_t) i;
1947  return K_OFFSET_POS_SET;
1948  }
1949 
1950  /*
1951  * Read the flag indicating the state of the data pointer. Check if valid
1952  * and die if not.
1953  *
1954  * This used to be handled by a negative or zero pointer, now we use an
1955  * extra byte specifically for the state.
1956  */
1957  offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1958 
1959  switch (offsetFlg)
1960  {
1961  case K_OFFSET_POS_NOT_SET:
1962  case K_OFFSET_NO_DATA:
1963  case K_OFFSET_POS_SET:
1964 
1965  break;
1966 
1967  default:
1968  pg_fatal("unexpected data offset flag %d", offsetFlg);
1969  }
1970 
1971  /*
1972  * Read the bytes
1973  */
1974  for (off = 0; off < AH->offSize; off++)
1975  {
1976  if (off < sizeof(pgoff_t))
1977  *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1978  else
1979  {
1980  if (AH->ReadBytePtr(AH) != 0)
1981  pg_fatal("file offset in dump file is too large");
1982  }
1983  }
1984 
1985  return offsetFlg;
1986 }
1987 
1988 size_t
1990 {
1991  int b;
1992 
1993  /*
1994  * This is a bit yucky, but I don't want to make the binary format very
1995  * dependent on representation, and not knowing much about it, I write out
1996  * a sign byte. If you change this, don't forget to change the file
1997  * version #, and modify ReadInt to read the new format AS WELL AS the old
1998  * formats.
1999  */
2000 
2001  /* SIGN byte */
2002  if (i < 0)
2003  {
2004  AH->WriteBytePtr(AH, 1);
2005  i = -i;
2006  }
2007  else
2008  AH->WriteBytePtr(AH, 0);
2009 
2010  for (b = 0; b < AH->intSize; b++)
2011  {
2012  AH->WriteBytePtr(AH, i & 0xFF);
2013  i >>= 8;
2014  }
2015 
2016  return AH->intSize + 1;
2017 }
2018 
2019 int
2021 {
2022  int res = 0;
2023  int bv,
2024  b;
2025  int sign = 0; /* Default positive */
2026  int bitShift = 0;
2027 
2028  if (AH->version > K_VERS_1_0)
2029  /* Read a sign byte */
2030  sign = AH->ReadBytePtr(AH);
2031 
2032  for (b = 0; b < AH->intSize; b++)
2033  {
2034  bv = AH->ReadBytePtr(AH) & 0xFF;
2035  if (bv != 0)
2036  res = res + (bv << bitShift);
2037  bitShift += 8;
2038  }
2039 
2040  if (sign)
2041  res = -res;
2042 
2043  return res;
2044 }
2045 
2046 size_t
2047 WriteStr(ArchiveHandle *AH, const char *c)
2048 {
2049  size_t res;
2050 
2051  if (c)
2052  {
2053  int len = strlen(c);
2054 
2055  res = WriteInt(AH, len);
2056  AH->WriteBufPtr(AH, c, len);
2057  res += len;
2058  }
2059  else
2060  res = WriteInt(AH, -1);
2061 
2062  return res;
2063 }
2064 
2065 char *
2067 {
2068  char *buf;
2069  int l;
2070 
2071  l = ReadInt(AH);
2072  if (l < 0)
2073  buf = NULL;
2074  else
2075  {
2076  buf = (char *) pg_malloc(l + 1);
2077  AH->ReadBufPtr(AH, (void *) buf, l);
2078 
2079  buf[l] = '\0';
2080  }
2081 
2082  return buf;
2083 }
2084 
2085 static bool
2086 _fileExistsInDirectory(const char *dir, const char *filename)
2087 {
2088  struct stat st;
2089  char buf[MAXPGPATH];
2090 
2091  if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2092  pg_fatal("directory name too long: \"%s\"", dir);
2093 
2094  return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2095 }
2096 
2097 static int
2099 {
2100  FILE *fh;
2101  char sig[6]; /* More than enough */
2102  size_t cnt;
2103  int wantClose = 0;
2104 
2105  pg_log_debug("attempting to ascertain archive format");
2106 
2107  free(AH->lookahead);
2108 
2109  AH->readHeader = 0;
2110  AH->lookaheadSize = 512;
2111  AH->lookahead = pg_malloc0(512);
2112  AH->lookaheadLen = 0;
2113  AH->lookaheadPos = 0;
2114 
2115  if (AH->fSpec)
2116  {
2117  struct stat st;
2118 
2119  wantClose = 1;
2120 
2121  /*
2122  * Check if the specified archive is a directory. If so, check if
2123  * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
2124  */
2125  if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2126  {
2127  AH->format = archDirectory;
2128  if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2129  return AH->format;
2130 #ifdef HAVE_LIBZ
2131  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2132  return AH->format;
2133 #endif
2134 #ifdef USE_LZ4
2135  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2136  return AH->format;
2137 #endif
2138  pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2139  AH->fSpec);
2140  fh = NULL; /* keep compiler quiet */
2141  }
2142  else
2143  {
2144  fh = fopen(AH->fSpec, PG_BINARY_R);
2145  if (!fh)
2146  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2147  }
2148  }
2149  else
2150  {
2151  fh = stdin;
2152  if (!fh)
2153  pg_fatal("could not open input file: %m");
2154  }
2155 
2156  if ((cnt = fread(sig, 1, 5, fh)) != 5)
2157  {
2158  if (ferror(fh))
2159  pg_fatal("could not read input file: %m");
2160  else
2161  pg_fatal("input file is too short (read %lu, expected 5)",
2162  (unsigned long) cnt);
2163  }
2164 
2165  /* Save it, just in case we need it later */
2166  memcpy(&AH->lookahead[0], sig, 5);
2167  AH->lookaheadLen = 5;
2168 
2169  if (strncmp(sig, "PGDMP", 5) == 0)
2170  {
2171  /* It's custom format, stop here */
2172  AH->format = archCustom;
2173  AH->readHeader = 1;
2174  }
2175  else
2176  {
2177  /*
2178  * *Maybe* we have a tar archive format file or a text dump ... So,
2179  * read first 512 byte header...
2180  */
2181  cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2182  /* read failure is checked below */
2183  AH->lookaheadLen += cnt;
2184 
2185  if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2186  (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2187  strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2188  {
2189  /*
2190  * looks like it's probably a text format dump. so suggest they
2191  * try psql
2192  */
2193  pg_fatal("input file appears to be a text format dump. Please use psql.");
2194  }
2195 
2196  if (AH->lookaheadLen != 512)
2197  {
2198  if (feof(fh))
2199  pg_fatal("input file does not appear to be a valid archive (too short?)");
2200  else
2201  READ_ERROR_EXIT(fh);
2202  }
2203 
2204  if (!isValidTarHeader(AH->lookahead))
2205  pg_fatal("input file does not appear to be a valid archive");
2206 
2207  AH->format = archTar;
2208  }
2209 
2210  /* Close the file if we opened it */
2211  if (wantClose)
2212  {
2213  if (fclose(fh) != 0)
2214  pg_fatal("could not close input file: %m");
2215  /* Forget lookahead, since we'll re-read header after re-opening */
2216  AH->readHeader = 0;
2217  AH->lookaheadLen = 0;
2218  }
2219 
2220  return AH->format;
2221 }
2222 
2223 
2224 /*
2225  * Allocate an archive handle
2226  */
2227 static ArchiveHandle *
2228 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2229  const pg_compress_specification compression_spec,
2230  bool dosync, ArchiveMode mode,
2231  SetupWorkerPtrType setupWorkerPtr)
2232 {
2233  ArchiveHandle *AH;
2234  CompressFileHandle *CFH;
2235  pg_compress_specification out_compress_spec = {0};
2236 
2237  pg_log_debug("allocating AH for %s, format %d",
2238  FileSpec ? FileSpec : "(stdio)", fmt);
2239 
2240  AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2241 
2242  AH->version = K_VERS_SELF;
2243 
2244  /* initialize for backwards compatible string processing */
2245  AH->public.encoding = 0; /* PG_SQL_ASCII */
2246  AH->public.std_strings = false;
2247 
2248  /* sql error handling */
2249  AH->public.exit_on_error = true;
2250  AH->public.n_errors = 0;
2251 
2252  AH->archiveDumpVersion = PG_VERSION;
2253 
2254  AH->createDate = time(NULL);
2255 
2256  AH->intSize = sizeof(int);
2257  AH->offSize = sizeof(pgoff_t);
2258  if (FileSpec)
2259  {
2260  AH->fSpec = pg_strdup(FileSpec);
2261 
2262  /*
2263  * Not used; maybe later....
2264  *
2265  * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2266  * i--) if (AH->workDir[i-1] == '/')
2267  */
2268  }
2269  else
2270  AH->fSpec = NULL;
2271 
2272  AH->currUser = NULL; /* unknown */
2273  AH->currSchema = NULL; /* ditto */
2274  AH->currTablespace = NULL; /* ditto */
2275  AH->currTableAm = NULL; /* ditto */
2276 
2277  AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2278 
2279  AH->toc->next = AH->toc;
2280  AH->toc->prev = AH->toc;
2281 
2282  AH->mode = mode;
2283  AH->compression_spec = compression_spec;
2284  AH->dosync = dosync;
2285 
2286  memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2287 
2288  /* Open stdout with no compression for AH output handle */
2289  out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2290  CFH = InitCompressFileHandle(out_compress_spec);
2291  if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2292  pg_fatal("could not open stdout for appending: %m");
2293  AH->OF = CFH;
2294 
2295  /*
2296  * On Windows, we need to use binary mode to read/write non-text files,
2297  * which include all archive formats as well as compressed plain text.
2298  * Force stdin/stdout into binary mode if that is what we are using.
2299  */
2300 #ifdef WIN32
2301  if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2302  (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2303  {
2304  if (mode == archModeWrite)
2305  _setmode(fileno(stdout), O_BINARY);
2306  else
2307  _setmode(fileno(stdin), O_BINARY);
2308  }
2309 #endif
2310 
2311  AH->SetupWorkerPtr = setupWorkerPtr;
2312 
2313  if (fmt == archUnknown)
2314  AH->format = _discoverArchiveFormat(AH);
2315  else
2316  AH->format = fmt;
2317 
2318  switch (AH->format)
2319  {
2320  case archCustom:
2322  break;
2323 
2324  case archNull:
2325  InitArchiveFmt_Null(AH);
2326  break;
2327 
2328  case archDirectory:
2330  break;
2331 
2332  case archTar:
2333  InitArchiveFmt_Tar(AH);
2334  break;
2335 
2336  default:
2337  pg_fatal("unrecognized file format \"%d\"", fmt);
2338  }
2339 
2340  return AH;
2341 }
2342 
2343 /*
2344  * Write out all data (tables & LOs)
2345  */
2346 void
2348 {
2349  TocEntry *te;
2350 
2351  if (pstate && pstate->numWorkers > 1)
2352  {
2353  /*
2354  * In parallel mode, this code runs in the leader process. We
2355  * construct an array of candidate TEs, then sort it into decreasing
2356  * size order, then dispatch each TE to a data-transfer worker. By
2357  * dumping larger tables first, we avoid getting into a situation
2358  * where we're down to one job and it's big, losing parallelism.
2359  */
2360  TocEntry **tes;
2361  int ntes;
2362 
2363  tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2364  ntes = 0;
2365  for (te = AH->toc->next; te != AH->toc; te = te->next)
2366  {
2367  /* Consider only TEs with dataDumper functions ... */
2368  if (!te->dataDumper)
2369  continue;
2370  /* ... and ignore ones not enabled for dump */
2371  if ((te->reqs & REQ_DATA) == 0)
2372  continue;
2373 
2374  tes[ntes++] = te;
2375  }
2376 
2377  if (ntes > 1)
2378  qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
2379 
2380  for (int i = 0; i < ntes; i++)
2381  DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2382  mark_dump_job_done, NULL);
2383 
2384  pg_free(tes);
2385 
2386  /* Now wait for workers to finish. */
2387  WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2388  }
2389  else
2390  {
2391  /* Non-parallel mode: just dump all candidate TEs sequentially. */
2392  for (te = AH->toc->next; te != AH->toc; te = te->next)
2393  {
2394  /* Must have same filter conditions as above */
2395  if (!te->dataDumper)
2396  continue;
2397  if ((te->reqs & REQ_DATA) == 0)
2398  continue;
2399 
2401  }
2402  }
2403 }
2404 
2405 
2406 /*
2407  * Callback function that's invoked in the leader process after a step has
2408  * been parallel dumped.
2409  *
2410  * We don't need to do anything except check for worker failure.
2411  */
2412 static void
2414  TocEntry *te,
2415  int status,
2416  void *callback_data)
2417 {
2418  pg_log_info("finished item %d %s %s",
2419  te->dumpId, te->desc, te->tag);
2420 
2421  if (status != 0)
2422  pg_fatal("worker process failed: exit code %d",
2423  status);
2424 }
2425 
2426 
2427 void
2429 {
2430  StartDataPtrType startPtr;
2431  EndDataPtrType endPtr;
2432 
2433  AH->currToc = te;
2434 
2435  if (strcmp(te->desc, "BLOBS") == 0)
2436  {
2437  startPtr = AH->StartLOsPtr;
2438  endPtr = AH->EndLOsPtr;
2439  }
2440  else
2441  {
2442  startPtr = AH->StartDataPtr;
2443  endPtr = AH->EndDataPtr;
2444  }
2445 
2446  if (startPtr != NULL)
2447  (*startPtr) (AH, te);
2448 
2449  /*
2450  * The user-provided DataDumper routine needs to call AH->WriteData
2451  */
2452  te->dataDumper((Archive *) AH, te->dataDumperArg);
2453 
2454  if (endPtr != NULL)
2455  (*endPtr) (AH, te);
2456 
2457  AH->currToc = NULL;
2458 }
2459 
2460 void
2462 {
2463  TocEntry *te;
2464  char workbuf[32];
2465  int tocCount;
2466  int i;
2467 
2468  /* count entries that will actually be dumped */
2469  tocCount = 0;
2470  for (te = AH->toc->next; te != AH->toc; te = te->next)
2471  {
2472  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2473  tocCount++;
2474  }
2475 
2476  /* printf("%d TOC Entries to save\n", tocCount); */
2477 
2478  WriteInt(AH, tocCount);
2479 
2480  for (te = AH->toc->next; te != AH->toc; te = te->next)
2481  {
2482  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2483  continue;
2484 
2485  WriteInt(AH, te->dumpId);
2486  WriteInt(AH, te->dataDumper ? 1 : 0);
2487 
2488  /* OID is recorded as a string for historical reasons */
2489  sprintf(workbuf, "%u", te->catalogId.tableoid);
2490  WriteStr(AH, workbuf);
2491  sprintf(workbuf, "%u", te->catalogId.oid);
2492  WriteStr(AH, workbuf);
2493 
2494  WriteStr(AH, te->tag);
2495  WriteStr(AH, te->desc);
2496  WriteInt(AH, te->section);
2497  WriteStr(AH, te->defn);
2498  WriteStr(AH, te->dropStmt);
2499  WriteStr(AH, te->copyStmt);
2500  WriteStr(AH, te->namespace);
2501  WriteStr(AH, te->tablespace);
2502  WriteStr(AH, te->tableam);
2503  WriteStr(AH, te->owner);
2504  WriteStr(AH, "false");
2505 
2506  /* Dump list of dependencies */
2507  for (i = 0; i < te->nDeps; i++)
2508  {
2509  sprintf(workbuf, "%d", te->dependencies[i]);
2510  WriteStr(AH, workbuf);
2511  }
2512  WriteStr(AH, NULL); /* Terminate List */
2513 
2514  if (AH->WriteExtraTocPtr)
2515  AH->WriteExtraTocPtr(AH, te);
2516  }
2517 }
2518 
2519 void
2521 {
2522  int i;
2523  char *tmp;
2524  DumpId *deps;
2525  int depIdx;
2526  int depSize;
2527  TocEntry *te;
2528  bool is_supported;
2529 
2530  AH->tocCount = ReadInt(AH);
2531  AH->maxDumpId = 0;
2532 
2533  for (i = 0; i < AH->tocCount; i++)
2534  {
2535  te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2536  te->dumpId = ReadInt(AH);
2537 
2538  if (te->dumpId > AH->maxDumpId)
2539  AH->maxDumpId = te->dumpId;
2540 
2541  /* Sanity check */
2542  if (te->dumpId <= 0)
2543  pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2544  te->dumpId);
2545 
2546  te->hadDumper = ReadInt(AH);
2547 
2548  if (AH->version >= K_VERS_1_8)
2549  {
2550  tmp = ReadStr(AH);
2551  sscanf(tmp, "%u", &te->catalogId.tableoid);
2552  free(tmp);
2553  }
2554  else
2556  tmp = ReadStr(AH);
2557  sscanf(tmp, "%u", &te->catalogId.oid);
2558  free(tmp);
2559 
2560  te->tag = ReadStr(AH);
2561  te->desc = ReadStr(AH);
2562 
2563  if (AH->version >= K_VERS_1_11)
2564  {
2565  te->section = ReadInt(AH);
2566  }
2567  else
2568  {
2569  /*
2570  * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2571  * the entries into sections. This list need not cover entry
2572  * types added later than 8.4.
2573  */
2574  if (strcmp(te->desc, "COMMENT") == 0 ||
2575  strcmp(te->desc, "ACL") == 0 ||
2576  strcmp(te->desc, "ACL LANGUAGE") == 0)
2577  te->section = SECTION_NONE;
2578  else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2579  strcmp(te->desc, "BLOBS") == 0 ||
2580  strcmp(te->desc, "BLOB COMMENTS") == 0)
2581  te->section = SECTION_DATA;
2582  else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2583  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2584  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2585  strcmp(te->desc, "INDEX") == 0 ||
2586  strcmp(te->desc, "RULE") == 0 ||
2587  strcmp(te->desc, "TRIGGER") == 0)
2588  te->section = SECTION_POST_DATA;
2589  else
2590  te->section = SECTION_PRE_DATA;
2591  }
2592 
2593  te->defn = ReadStr(AH);
2594  te->dropStmt = ReadStr(AH);
2595 
2596  if (AH->version >= K_VERS_1_3)
2597  te->copyStmt = ReadStr(AH);
2598 
2599  if (AH->version >= K_VERS_1_6)
2600  te->namespace = ReadStr(AH);
2601 
2602  if (AH->version >= K_VERS_1_10)
2603  te->tablespace = ReadStr(AH);
2604 
2605  if (AH->version >= K_VERS_1_14)
2606  te->tableam = ReadStr(AH);
2607 
2608  te->owner = ReadStr(AH);
2609  is_supported = true;
2610  if (AH->version < K_VERS_1_9)
2611  is_supported = false;
2612  else
2613  {
2614  tmp = ReadStr(AH);
2615 
2616  if (strcmp(tmp, "true") == 0)
2617  is_supported = false;
2618 
2619  free(tmp);
2620  }
2621 
2622  if (!is_supported)
2623  pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2624 
2625  /* Read TOC entry dependencies */
2626  if (AH->version >= K_VERS_1_5)
2627  {
2628  depSize = 100;
2629  deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2630  depIdx = 0;
2631  for (;;)
2632  {
2633  tmp = ReadStr(AH);
2634  if (!tmp)
2635  break; /* end of list */
2636  if (depIdx >= depSize)
2637  {
2638  depSize *= 2;
2639  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2640  }
2641  sscanf(tmp, "%d", &deps[depIdx]);
2642  free(tmp);
2643  depIdx++;
2644  }
2645 
2646  if (depIdx > 0) /* We have a non-null entry */
2647  {
2648  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2649  te->dependencies = deps;
2650  te->nDeps = depIdx;
2651  }
2652  else
2653  {
2654  free(deps);
2655  te->dependencies = NULL;
2656  te->nDeps = 0;
2657  }
2658  }
2659  else
2660  {
2661  te->dependencies = NULL;
2662  te->nDeps = 0;
2663  }
2664  te->dataLength = 0;
2665 
2666  if (AH->ReadExtraTocPtr)
2667  AH->ReadExtraTocPtr(AH, te);
2668 
2669  pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2670  i, te->dumpId, te->desc, te->tag);
2671 
2672  /* link completed entry into TOC circular list */
2673  te->prev = AH->toc->prev;
2674  AH->toc->prev->next = te;
2675  AH->toc->prev = te;
2676  te->next = AH->toc;
2677 
2678  /* special processing immediately upon read for some items */
2679  if (strcmp(te->desc, "ENCODING") == 0)
2680  processEncodingEntry(AH, te);
2681  else if (strcmp(te->desc, "STDSTRINGS") == 0)
2682  processStdStringsEntry(AH, te);
2683  else if (strcmp(te->desc, "SEARCHPATH") == 0)
2684  processSearchPathEntry(AH, te);
2685  }
2686 }
2687 
2688 static void
2690 {
2691  /* te->defn should have the form SET client_encoding = 'foo'; */
2692  char *defn = pg_strdup(te->defn);
2693  char *ptr1;
2694  char *ptr2 = NULL;
2695  int encoding;
2696 
2697  ptr1 = strchr(defn, '\'');
2698  if (ptr1)
2699  ptr2 = strchr(++ptr1, '\'');
2700  if (ptr2)
2701  {
2702  *ptr2 = '\0';
2703  encoding = pg_char_to_encoding(ptr1);
2704  if (encoding < 0)
2705  pg_fatal("unrecognized encoding \"%s\"",
2706  ptr1);
2707  AH->public.encoding = encoding;
2708  }
2709  else
2710  pg_fatal("invalid ENCODING item: %s",
2711  te->defn);
2712 
2713  free(defn);
2714 }
2715 
2716 static void
2718 {
2719  /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2720  char *ptr1;
2721 
2722  ptr1 = strchr(te->defn, '\'');
2723  if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2724  AH->public.std_strings = true;
2725  else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2726  AH->public.std_strings = false;
2727  else
2728  pg_fatal("invalid STDSTRINGS item: %s",
2729  te->defn);
2730 }
2731 
2732 static void
2734 {
2735  /*
2736  * te->defn should contain a command to set search_path. We just copy it
2737  * verbatim for use later.
2738  */
2739  AH->public.searchpath = pg_strdup(te->defn);
2740 }
2741 
2742 static void
2744 {
2745  const char *missing_name;
2746 
2747  Assert(ropt->strict_names);
2748 
2749  if (ropt->schemaNames.head != NULL)
2750  {
2751  missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2752  if (missing_name != NULL)
2753  pg_fatal("schema \"%s\" not found", missing_name);
2754  }
2755 
2756  if (ropt->tableNames.head != NULL)
2757  {
2758  missing_name = simple_string_list_not_touched(&ropt->tableNames);
2759  if (missing_name != NULL)
2760  pg_fatal("table \"%s\" not found", missing_name);
2761  }
2762 
2763  if (ropt->indexNames.head != NULL)
2764  {
2765  missing_name = simple_string_list_not_touched(&ropt->indexNames);
2766  if (missing_name != NULL)
2767  pg_fatal("index \"%s\" not found", missing_name);
2768  }
2769 
2770  if (ropt->functionNames.head != NULL)
2771  {
2772  missing_name = simple_string_list_not_touched(&ropt->functionNames);
2773  if (missing_name != NULL)
2774  pg_fatal("function \"%s\" not found", missing_name);
2775  }
2776 
2777  if (ropt->triggerNames.head != NULL)
2778  {
2779  missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2780  if (missing_name != NULL)
2781  pg_fatal("trigger \"%s\" not found", missing_name);
2782  }
2783 }
2784 
2785 /*
2786  * Determine whether we want to restore this TOC entry.
2787  *
2788  * Returns 0 if entry should be skipped, or some combination of the
2789  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2790  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2791  */
2792 static int
2794 {
2795  int res = REQ_SCHEMA | REQ_DATA;
2796  RestoreOptions *ropt = AH->public.ropt;
2797 
2798  /* These items are treated specially */
2799  if (strcmp(te->desc, "ENCODING") == 0 ||
2800  strcmp(te->desc, "STDSTRINGS") == 0 ||
2801  strcmp(te->desc, "SEARCHPATH") == 0)
2802  return REQ_SPECIAL;
2803 
2804  /*
2805  * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2806  * restored in createDB mode, and not restored otherwise, independently of
2807  * all else.
2808  */
2809  if (strcmp(te->desc, "DATABASE") == 0 ||
2810  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2811  {
2812  if (ropt->createDB)
2813  return REQ_SCHEMA;
2814  else
2815  return 0;
2816  }
2817 
2818  /*
2819  * Process exclusions that affect certain classes of TOC entries.
2820  */
2821 
2822  /* If it's an ACL, maybe ignore it */
2823  if (ropt->aclsSkip && _tocEntryIsACL(te))
2824  return 0;
2825 
2826  /* If it's a comment, maybe ignore it */
2827  if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2828  return 0;
2829 
2830  /*
2831  * If it's a publication or a table part of a publication, maybe ignore
2832  * it.
2833  */
2834  if (ropt->no_publications &&
2835  (strcmp(te->desc, "PUBLICATION") == 0 ||
2836  strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2837  strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2838  return 0;
2839 
2840  /* If it's a security label, maybe ignore it */
2841  if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2842  return 0;
2843 
2844  /* If it's a subscription, maybe ignore it */
2845  if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2846  return 0;
2847 
2848  /* Ignore it if section is not to be dumped/restored */
2849  switch (curSection)
2850  {
2851  case SECTION_PRE_DATA:
2852  if (!(ropt->dumpSections & DUMP_PRE_DATA))
2853  return 0;
2854  break;
2855  case SECTION_DATA:
2856  if (!(ropt->dumpSections & DUMP_DATA))
2857  return 0;
2858  break;
2859  case SECTION_POST_DATA:
2860  if (!(ropt->dumpSections & DUMP_POST_DATA))
2861  return 0;
2862  break;
2863  default:
2864  /* shouldn't get here, really, but ignore it */
2865  return 0;
2866  }
2867 
2868  /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2869  if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2870  return 0;
2871 
2872  /*
2873  * Check options for selective dump/restore.
2874  */
2875  if (strcmp(te->desc, "ACL") == 0 ||
2876  strcmp(te->desc, "COMMENT") == 0 ||
2877  strcmp(te->desc, "SECURITY LABEL") == 0)
2878  {
2879  /* Database properties react to createDB, not selectivity options. */
2880  if (strncmp(te->tag, "DATABASE ", 9) == 0)
2881  {
2882  if (!ropt->createDB)
2883  return 0;
2884  }
2885  else if (ropt->schemaNames.head != NULL ||
2886  ropt->schemaExcludeNames.head != NULL ||
2887  ropt->selTypes)
2888  {
2889  /*
2890  * In a selective dump/restore, we want to restore these dependent
2891  * TOC entry types only if their parent object is being restored.
2892  * Without selectivity options, we let through everything in the
2893  * archive. Note there may be such entries with no parent, eg
2894  * non-default ACLs for built-in objects.
2895  *
2896  * This code depends on the parent having been marked already,
2897  * which should be the case; if it isn't, perhaps due to
2898  * SortTocFromFile rearrangement, skipping the dependent entry
2899  * seems prudent anyway.
2900  *
2901  * Ideally we'd handle, eg, table CHECK constraints this way too.
2902  * But it's hard to tell which of their dependencies is the one to
2903  * consult.
2904  */
2905  if (te->nDeps != 1 ||
2906  TocIDRequired(AH, te->dependencies[0]) == 0)
2907  return 0;
2908  }
2909  }
2910  else
2911  {
2912  /* Apply selective-restore rules for standalone TOC entries. */
2913  if (ropt->schemaNames.head != NULL)
2914  {
2915  /* If no namespace is specified, it means all. */
2916  if (!te->namespace)
2917  return 0;
2918  if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2919  return 0;
2920  }
2921 
2922  if (ropt->schemaExcludeNames.head != NULL &&
2923  te->namespace &&
2924  simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2925  return 0;
2926 
2927  if (ropt->selTypes)
2928  {
2929  if (strcmp(te->desc, "TABLE") == 0 ||
2930  strcmp(te->desc, "TABLE DATA") == 0 ||
2931  strcmp(te->desc, "VIEW") == 0 ||
2932  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2933  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2934  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2935  strcmp(te->desc, "SEQUENCE") == 0 ||
2936  strcmp(te->desc, "SEQUENCE SET") == 0)
2937  {
2938  if (!ropt->selTable)
2939  return 0;
2940  if (ropt->tableNames.head != NULL &&
2942  return 0;
2943  }
2944  else if (strcmp(te->desc, "INDEX") == 0)
2945  {
2946  if (!ropt->selIndex)
2947  return 0;
2948  if (ropt->indexNames.head != NULL &&
2950  return 0;
2951  }
2952  else if (strcmp(te->desc, "FUNCTION") == 0 ||
2953  strcmp(te->desc, "AGGREGATE") == 0 ||
2954  strcmp(te->desc, "PROCEDURE") == 0)
2955  {
2956  if (!ropt->selFunction)
2957  return 0;
2958  if (ropt->functionNames.head != NULL &&
2960  return 0;
2961  }
2962  else if (strcmp(te->desc, "TRIGGER") == 0)
2963  {
2964  if (!ropt->selTrigger)
2965  return 0;
2966  if (ropt->triggerNames.head != NULL &&
2968  return 0;
2969  }
2970  else
2971  return 0;
2972  }
2973  }
2974 
2975  /*
2976  * Determine whether the TOC entry contains schema and/or data components,
2977  * and mask off inapplicable REQ bits. If it had a dataDumper, assume
2978  * it's both schema and data. Otherwise it's probably schema-only, but
2979  * there are exceptions.
2980  */
2981  if (!te->hadDumper)
2982  {
2983  /*
2984  * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then
2985  * it is considered a data entry. We don't need to check for the
2986  * BLOBS entry or old-style BLOB COMMENTS, because they will have
2987  * hadDumper = true ... but we do need to check new-style BLOB ACLs,
2988  * comments, etc.
2989  */
2990  if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2991  strcmp(te->desc, "BLOB") == 0 ||
2992  (strcmp(te->desc, "ACL") == 0 &&
2993  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2994  (strcmp(te->desc, "COMMENT") == 0 &&
2995  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2996  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2997  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2998  res = res & REQ_DATA;
2999  else
3000  res = res & ~REQ_DATA;
3001  }
3002 
3003  /*
3004  * If there's no definition command, there's no schema component. Treat
3005  * "load via partition root" comments as not schema.
3006  */
3007  if (!te->defn || !te->defn[0] ||
3008  strncmp(te->defn, "-- load via partition root ", 27) == 0)
3009  res = res & ~REQ_SCHEMA;
3010 
3011  /*
3012  * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3013  * always ignore it.
3014  */
3015  if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3016  return 0;
3017 
3018  /* Mask it if we only want schema */
3019  if (ropt->schemaOnly)
3020  {
3021  /*
3022  * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3023  *
3024  * In binary-upgrade mode, even with schemaOnly set, we do not mask
3025  * out large objects. (Only large object definitions, comments and
3026  * other metadata should be generated in binary-upgrade mode, not the
3027  * actual data, but that need not concern us here.)
3028  */
3029  if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3030  !(ropt->binary_upgrade &&
3031  (strcmp(te->desc, "BLOB") == 0 ||
3032  (strcmp(te->desc, "ACL") == 0 &&
3033  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3034  (strcmp(te->desc, "COMMENT") == 0 &&
3035  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3036  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3037  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
3038  res = res & REQ_SCHEMA;
3039  }
3040 
3041  /* Mask it if we only want data */
3042  if (ropt->dataOnly)
3043  res = res & REQ_DATA;
3044 
3045  return res;
3046 }
3047 
3048 /*
3049  * Identify which pass we should restore this TOC entry in.
3050  *
3051  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3052  */
3053 static RestorePass
3055 {
3056  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3057  if (strcmp(te->desc, "ACL") == 0 ||
3058  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3059  strcmp(te->desc, "DEFAULT ACL") == 0)
3060  return RESTORE_PASS_ACL;
3061  if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3062  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3063  return RESTORE_PASS_POST_ACL;
3064 
3065  /*
3066  * Comments need to be emitted in the same pass as their parent objects.
3067  * ACLs haven't got comments, and neither do matview data objects, but
3068  * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3069  * we'd need yet another weird special case.)
3070  */
3071  if (strcmp(te->desc, "COMMENT") == 0 &&
3072  strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3073  return RESTORE_PASS_POST_ACL;
3074 
3075  /* All else can be handled in the main pass. */
3076  return RESTORE_PASS_MAIN;
3077 }
3078 
3079 /*
3080  * Identify TOC entries that are ACLs.
3081  *
3082  * Note: it seems worth duplicating some code here to avoid a hard-wired
3083  * assumption that these are exactly the same entries that we restore during
3084  * the RESTORE_PASS_ACL phase.
3085  */
3086 static bool
3088 {
3089  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3090  if (strcmp(te->desc, "ACL") == 0 ||
3091  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3092  strcmp(te->desc, "DEFAULT ACL") == 0)
3093  return true;
3094  return false;
3095 }
3096 
3097 /*
3098  * Issue SET commands for parameters that we want to have set the same way
3099  * at all times during execution of a restore script.
3100  */
3101 static void
3103 {
3104  RestoreOptions *ropt = AH->public.ropt;
3105 
3106  /*
3107  * Disable timeouts to allow for slow commands, idle parallel workers, etc
3108  */
3109  ahprintf(AH, "SET statement_timeout = 0;\n");
3110  ahprintf(AH, "SET lock_timeout = 0;\n");
3111  ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3112 
3113  /* Select the correct character set encoding */
3114  ahprintf(AH, "SET client_encoding = '%s';\n",
3116 
3117  /* Select the correct string literal syntax */
3118  ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3119  AH->public.std_strings ? "on" : "off");
3120 
3121  /* Select the role to be used during restore */
3122  if (ropt && ropt->use_role)
3123  ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3124 
3125  /* Select the dump-time search_path */
3126  if (AH->public.searchpath)
3127  ahprintf(AH, "%s", AH->public.searchpath);
3128 
3129  /* Make sure function checking is disabled */
3130  ahprintf(AH, "SET check_function_bodies = false;\n");
3131 
3132  /* Ensure that all valid XML data will be accepted */
3133  ahprintf(AH, "SET xmloption = content;\n");
3134 
3135  /* Avoid annoying notices etc */
3136  ahprintf(AH, "SET client_min_messages = warning;\n");
3137  if (!AH->public.std_strings)
3138  ahprintf(AH, "SET escape_string_warning = off;\n");
3139 
3140  /* Adjust row-security state */
3141  if (ropt && ropt->enable_row_security)
3142  ahprintf(AH, "SET row_security = on;\n");
3143  else
3144  ahprintf(AH, "SET row_security = off;\n");
3145 
3146  ahprintf(AH, "\n");
3147 }
3148 
3149 /*
3150  * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3151  * for updating state if appropriate. If user is NULL or an empty string,
3152  * the specification DEFAULT will be used.
3153  */
3154 static void
3156 {
3158 
3159  appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3160 
3161  /*
3162  * SQL requires a string literal here. Might as well be correct.
3163  */
3164  if (user && *user)
3165  appendStringLiteralAHX(cmd, user, AH);
3166  else
3167  appendPQExpBufferStr(cmd, "DEFAULT");
3168  appendPQExpBufferChar(cmd, ';');
3169 
3170  if (RestoringToDB(AH))
3171  {
3172  PGresult *res;
3173 
3174  res = PQexec(AH->connection, cmd->data);
3175 
3176  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3177  /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3178  pg_fatal("could not set session user to \"%s\": %s",
3180 
3181  PQclear(res);
3182  }
3183  else
3184  ahprintf(AH, "%s\n\n", cmd->data);
3185 
3186  destroyPQExpBuffer(cmd);
3187 }
3188 
3189 
3190 /*
3191  * Issue the commands to connect to the specified database.
3192  *
3193  * If we're currently restoring right into a database, this will
3194  * actually establish a connection. Otherwise it puts a \connect into
3195  * the script output.
3196  */
3197 static void
3199 {
3200  if (RestoringToDB(AH))
3202  else
3203  {
3204  PQExpBufferData connectbuf;
3205 
3206  initPQExpBuffer(&connectbuf);
3207  appendPsqlMetaConnect(&connectbuf, dbname);
3208  ahprintf(AH, "%s\n", connectbuf.data);
3209  termPQExpBuffer(&connectbuf);
3210  }
3211 
3212  /*
3213  * NOTE: currUser keeps track of what the imaginary session user in our
3214  * script is. It's now effectively reset to the original userID.
3215  */
3216  free(AH->currUser);
3217  AH->currUser = NULL;
3218 
3219  /* don't assume we still know the output schema, tablespace, etc either */
3220  free(AH->currSchema);
3221  AH->currSchema = NULL;
3222 
3223  free(AH->currTableAm);
3224  AH->currTableAm = NULL;
3225 
3226  free(AH->currTablespace);
3227  AH->currTablespace = NULL;
3228 
3229  /* re-establish fixed state */
3231 }
3232 
3233 /*
3234  * Become the specified user, and update state to avoid redundant commands
3235  *
3236  * NULL or empty argument is taken to mean restoring the session default
3237  */
3238 static void
3239 _becomeUser(ArchiveHandle *AH, const char *user)
3240 {
3241  if (!user)
3242  user = ""; /* avoid null pointers */
3243 
3244  if (AH->currUser && strcmp(AH->currUser, user) == 0)
3245  return; /* no need to do anything */
3246 
3247  _doSetSessionAuth(AH, user);
3248 
3249  /*
3250  * NOTE: currUser keeps track of what the imaginary session user in our
3251  * script is
3252  */
3253  free(AH->currUser);
3254  AH->currUser = pg_strdup(user);
3255 }
3256 
3257 /*
3258  * Become the owner of the given TOC entry object. If
3259  * changes in ownership are not allowed, this doesn't do anything.
3260  */
3261 static void
3263 {
3264  RestoreOptions *ropt = AH->public.ropt;
3265 
3266  if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3267  return;
3268 
3269  _becomeUser(AH, te->owner);
3270 }
3271 
3272 
3273 /*
3274  * Issue the commands to select the specified schema as the current schema
3275  * in the target database.
3276  */
3277 static void
3278 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3279 {
3280  PQExpBuffer qry;
3281 
3282  /*
3283  * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3284  * that search_path rather than switching to entry-specific paths.
3285  * Otherwise, it's an old archive that will not restore correctly unless
3286  * we set the search_path as it's expecting.
3287  */
3288  if (AH->public.searchpath)
3289  return;
3290 
3291  if (!schemaName || *schemaName == '\0' ||
3292  (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3293  return; /* no need to do anything */
3294 
3295  qry = createPQExpBuffer();
3296 
3297  appendPQExpBuffer(qry, "SET search_path = %s",
3298  fmtId(schemaName));
3299  if (strcmp(schemaName, "pg_catalog") != 0)
3300  appendPQExpBufferStr(qry, ", pg_catalog");
3301 
3302  if (RestoringToDB(AH))
3303  {
3304  PGresult *res;
3305 
3306  res = PQexec(AH->connection, qry->data);
3307 
3308  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3310  "could not set search_path to \"%s\": %s",
3311  schemaName, PQerrorMessage(AH->connection));
3312 
3313  PQclear(res);
3314  }
3315  else
3316  ahprintf(AH, "%s;\n\n", qry->data);
3317 
3318  free(AH->currSchema);
3319  AH->currSchema = pg_strdup(schemaName);
3320 
3321  destroyPQExpBuffer(qry);
3322 }
3323 
3324 /*
3325  * Issue the commands to select the specified tablespace as the current one
3326  * in the target database.
3327  */
3328 static void
3330 {
3331  RestoreOptions *ropt = AH->public.ropt;
3332  PQExpBuffer qry;
3333  const char *want,
3334  *have;
3335 
3336  /* do nothing in --no-tablespaces mode */
3337  if (ropt->noTablespace)
3338  return;
3339 
3340  have = AH->currTablespace;
3341  want = tablespace;
3342 
3343  /* no need to do anything for non-tablespace object */
3344  if (!want)
3345  return;
3346 
3347  if (have && strcmp(want, have) == 0)
3348  return; /* no need to do anything */
3349 
3350  qry = createPQExpBuffer();
3351 
3352  if (strcmp(want, "") == 0)
3353  {
3354  /* We want the tablespace to be the database's default */
3355  appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3356  }
3357  else
3358  {
3359  /* We want an explicit tablespace */
3360  appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3361  }
3362 
3363  if (RestoringToDB(AH))
3364  {
3365  PGresult *res;
3366 
3367  res = PQexec(AH->connection, qry->data);
3368 
3369  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3371  "could not set default_tablespace to %s: %s",
3372  fmtId(want), PQerrorMessage(AH->connection));
3373 
3374  PQclear(res);
3375  }
3376  else
3377  ahprintf(AH, "%s;\n\n", qry->data);
3378 
3379  free(AH->currTablespace);
3380  AH->currTablespace = pg_strdup(want);
3381 
3382  destroyPQExpBuffer(qry);
3383 }
3384 
3385 /*
3386  * Set the proper default_table_access_method value for the table.
3387  */
3388 static void
3389 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3390 {
3391  RestoreOptions *ropt = AH->public.ropt;
3392  PQExpBuffer cmd;
3393  const char *want,
3394  *have;
3395 
3396  /* do nothing in --no-table-access-method mode */
3397  if (ropt->noTableAm)
3398  return;
3399 
3400  have = AH->currTableAm;
3401  want = tableam;
3402 
3403  if (!want)
3404  return;
3405 
3406  if (have && strcmp(want, have) == 0)
3407  return;
3408 
3409  cmd = createPQExpBuffer();
3410  appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3411 
3412  if (RestoringToDB(AH))
3413  {
3414  PGresult *res;
3415 
3416  res = PQexec(AH->connection, cmd->data);
3417 
3418  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3420  "could not set default_table_access_method: %s",
3421  PQerrorMessage(AH->connection));
3422 
3423  PQclear(res);
3424  }
3425  else
3426  ahprintf(AH, "%s\n\n", cmd->data);
3427 
3428  destroyPQExpBuffer(cmd);
3429 
3430  free(AH->currTableAm);
3431  AH->currTableAm = pg_strdup(want);
3432 }
3433 
3434 /*
3435  * Extract an object description for a TOC entry, and append it to buf.
3436  *
3437  * This is used for ALTER ... OWNER TO.
3438  *
3439  * If the object type has no owner, do nothing.
3440  */
3441 static void
3443 {
3444  const char *type = te->desc;
3445 
3446  /* objects that don't require special decoration */
3447  if (strcmp(type, "COLLATION") == 0 ||
3448  strcmp(type, "CONVERSION") == 0 ||
3449  strcmp(type, "DOMAIN") == 0 ||
3450  strcmp(type, "FOREIGN TABLE") == 0 ||
3451  strcmp(type, "MATERIALIZED VIEW") == 0 ||
3452  strcmp(type, "SEQUENCE") == 0 ||
3453  strcmp(type, "STATISTICS") == 0 ||
3454  strcmp(type, "TABLE") == 0 ||
3455  strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3456  strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3457  strcmp(type, "TYPE") == 0 ||
3458  strcmp(type, "VIEW") == 0 ||
3459  /* non-schema-specified objects */
3460  strcmp(type, "DATABASE") == 0 ||
3461  strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3462  strcmp(type, "SCHEMA") == 0 ||
3463  strcmp(type, "EVENT TRIGGER") == 0 ||
3464  strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3465  strcmp(type, "SERVER") == 0 ||
3466  strcmp(type, "PUBLICATION") == 0 ||
3467  strcmp(type, "SUBSCRIPTION") == 0)
3468  {
3469  appendPQExpBuffer(buf, "%s ", type);
3470  if (te->namespace && *te->namespace)
3471  appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3473  }
3474  /* LOs just have a name, but it's numeric so must not use fmtId */
3475  else if (strcmp(type, "BLOB") == 0)
3476  {
3477  appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3478  }
3479  /*
3480  * These object types require additional decoration. Fortunately, the
3481  * information needed is exactly what's in the DROP command.
3482  */
3483  else if (strcmp(type, "AGGREGATE") == 0 ||
3484  strcmp(type, "FUNCTION") == 0 ||
3485  strcmp(type, "OPERATOR") == 0 ||
3486  strcmp(type, "OPERATOR CLASS") == 0 ||
3487  strcmp(type, "OPERATOR FAMILY") == 0 ||
3488  strcmp(type, "PROCEDURE") == 0)
3489  {
3490  /* Chop "DROP " off the front and make a modifiable copy */
3491  char *first = pg_strdup(te->dropStmt + 5);
3492  char *last;
3493 
3494  /* point to last character in string */
3495  last = first + strlen(first) - 1;
3496 
3497  /* Strip off any ';' or '\n' at the end */
3498  while (last >= first && (*last == '\n' || *last == ';'))
3499  last--;
3500  *(last + 1) = '\0';
3501 
3502  appendPQExpBufferStr(buf, first);
3503 
3504  free(first);
3505  return;
3506  }
3507  /* these object types don't have separate owners */
3508  else if (strcmp(type, "CAST") == 0 ||
3509  strcmp(type, "CHECK CONSTRAINT") == 0 ||
3510  strcmp(type, "CONSTRAINT") == 0 ||
3511  strcmp(type, "DATABASE PROPERTIES") == 0 ||
3512  strcmp(type, "DEFAULT") == 0 ||
3513  strcmp(type, "FK CONSTRAINT") == 0 ||
3514  strcmp(type, "INDEX") == 0 ||
3515  strcmp(type, "RULE") == 0 ||
3516  strcmp(type, "TRIGGER") == 0 ||
3517  strcmp(type, "ROW SECURITY") == 0 ||
3518  strcmp(type, "POLICY") == 0 ||
3519  strcmp(type, "USER MAPPING") == 0)
3520  {
3521  /* do nothing */
3522  }
3523  else
3524  pg_fatal("don't know how to set owner for object type \"%s\"", type);
3525 }
3526 
3527 /*
3528  * Emit the SQL commands to create the object represented by a TOC entry
3529  *
3530  * This now also includes issuing an ALTER OWNER command to restore the
3531  * object's ownership, if wanted. But note that the object's permissions
3532  * will remain at default, until the matching ACL TOC entry is restored.
3533  */
3534 static void
3535 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3536 {
3537  RestoreOptions *ropt = AH->public.ropt;
3538 
3539  /* Select owner, schema, tablespace and default AM as necessary */
3540  _becomeOwner(AH, te);
3541  _selectOutputSchema(AH, te->namespace);
3542  _selectTablespace(AH, te->tablespace);
3544 
3545  /* Emit header comment for item */
3546  if (!AH->noTocComments)
3547  {
3548  const char *pfx;
3549  char *sanitized_name;
3550  char *sanitized_schema;
3551  char *sanitized_owner;
3552 
3553  if (isData)
3554  pfx = "Data for ";
3555  else
3556  pfx = "";
3557 
3558  ahprintf(AH, "--\n");
3559  if (AH->public.verbose)
3560  {
3561  ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3562  te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3563  if (te->nDeps > 0)
3564  {
3565  int i;
3566 
3567  ahprintf(AH, "-- Dependencies:");
3568  for (i = 0; i < te->nDeps; i++)
3569  ahprintf(AH, " %d", te->dependencies[i]);
3570  ahprintf(AH, "\n");
3571  }
3572  }
3573 
3574  sanitized_name = sanitize_line(te->tag, false);
3575  sanitized_schema = sanitize_line(te->namespace, true);
3576  sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3577 
3578  ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3579  pfx, sanitized_name, te->desc, sanitized_schema,
3580  sanitized_owner);
3581 
3582  free(sanitized_name);
3583  free(sanitized_schema);
3584  free(sanitized_owner);
3585 
3586  if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3587  {
3588  char *sanitized_tablespace;
3589 
3590  sanitized_tablespace = sanitize_line(te->tablespace, false);
3591  ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3592  free(sanitized_tablespace);
3593  }
3594  ahprintf(AH, "\n");
3595 
3596  if (AH->PrintExtraTocPtr != NULL)
3597  AH->PrintExtraTocPtr(AH, te);
3598  ahprintf(AH, "--\n\n");
3599  }
3600 
3601  /*
3602  * Actually print the definition.
3603  *
3604  * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3605  * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3606  * "public" that is a comment. We have to do this when --no-owner mode is
3607  * selected. This is ugly, but I see no other good way ...
3608  */
3609  if (ropt->noOwner &&
3610  strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3611  {
3612  ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3613  }
3614  else
3615  {
3616  if (te->defn && strlen(te->defn) > 0)
3617  ahprintf(AH, "%s\n\n", te->defn);
3618  }
3619 
3620  /*
3621  * If we aren't using SET SESSION AUTH to determine ownership, we must
3622  * instead issue an ALTER OWNER command. Schema "public" is special; when
3623  * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3624  * when using SET SESSION for all other objects. We assume that anything
3625  * without a DROP command is not a separately ownable object.
3626  */
3627  if (!ropt->noOwner &&
3628  (!ropt->use_setsessauth ||
3629  (strcmp(te->desc, "SCHEMA") == 0 &&
3630  strncmp(te->defn, "--", 2) == 0)) &&
3631  te->owner && strlen(te->owner) > 0 &&
3632  te->dropStmt && strlen(te->dropStmt) > 0)
3633  {
3634  PQExpBufferData temp;
3635 
3636  initPQExpBuffer(&temp);
3637  _getObjectDescription(&temp, te);
3638  /*
3639  * If _getObjectDescription() didn't fill the buffer, then there is no
3640  * owner.
3641  */
3642  if (temp.data[0])
3643  ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp.data, fmtId(te->owner));
3644  termPQExpBuffer(&temp);
3645  }
3646 
3647  /*
3648  * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3649  * commands, so we can no longer assume we know the current auth setting.
3650  */
3651  if (_tocEntryIsACL(te))
3652  {
3653  free(AH->currUser);
3654  AH->currUser = NULL;
3655  }
3656 }
3657 
3658 /*
3659  * Sanitize a string to be included in an SQL comment or TOC listing, by
3660  * replacing any newlines with spaces. This ensures each logical output line
3661  * is in fact one physical output line, to prevent corruption of the dump
3662  * (which could, in the worst case, present an SQL injection vulnerability
3663  * if someone were to incautiously load a dump containing objects with
3664  * maliciously crafted names).
3665  *
3666  * The result is a freshly malloc'd string. If the input string is NULL,
3667  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3668  * malloc'ed hyphen.
3669  *
3670  * Note that we currently don't bother to quote names, meaning that the name
3671  * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3672  * it only examines the dumpId field, but someday we might want to try harder.
3673  */
3674 static char *
3675 sanitize_line(const char *str, bool want_hyphen)
3676 {
3677  char *result;
3678  char *s;
3679 
3680  if (!str)
3681  return pg_strdup(want_hyphen ? "-" : "");
3682 
3683  result = pg_strdup(str);
3684 
3685  for (s = result; *s != '\0'; s++)
3686  {
3687  if (*s == '\n' || *s == '\r')
3688  *s = ' ';
3689  }
3690 
3691  return result;
3692 }
3693 
3694 /*
3695  * Write the file header for a custom-format archive
3696  */
3697 void
3699 {
3700  struct tm crtm;
3701 
3702  AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3703  AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3704  AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3705  AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3706  AH->WriteBytePtr(AH, AH->intSize);
3707  AH->WriteBytePtr(AH, AH->offSize);
3708  AH->WriteBytePtr(AH, AH->format);
3710  crtm = *localtime(&AH->createDate);
3711  WriteInt(AH, crtm.tm_sec);
3712  WriteInt(AH, crtm.tm_min);
3713  WriteInt(AH, crtm.tm_hour);
3714  WriteInt(AH, crtm.tm_mday);
3715  WriteInt(AH, crtm.tm_mon);
3716  WriteInt(AH, crtm.tm_year);
3717  WriteInt(AH, crtm.tm_isdst);
3718  WriteStr(AH, PQdb(AH->connection));
3719  WriteStr(AH, AH->public.remoteVersionStr);
3720  WriteStr(AH, PG_VERSION);
3721 }
3722 
3723 void
3725 {
3726  char *errmsg;
3727  char vmaj,
3728  vmin,
3729  vrev;
3730  int fmt;
3731 
3732  /*
3733  * If we haven't already read the header, do so.
3734  *
3735  * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3736  * way to unify the cases?
3737  */
3738  if (!AH->readHeader)
3739  {
3740  char tmpMag[7];
3741 
3742  AH->ReadBufPtr(AH, tmpMag, 5);
3743 
3744  if (strncmp(tmpMag, "PGDMP", 5) != 0)
3745  pg_fatal("did not find magic string in file header");
3746  }
3747 
3748  vmaj = AH->ReadBytePtr(AH);
3749  vmin = AH->ReadBytePtr(AH);
3750 
3751  if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3752  vrev = AH->ReadBytePtr(AH);
3753  else
3754  vrev = 0;
3755 
3756  AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3757 
3758  if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3759  pg_fatal("unsupported version (%d.%d) in file header",
3760  vmaj, vmin);
3761 
3762  AH->intSize = AH->ReadBytePtr(AH);
3763  if (AH->intSize > 32)
3764  pg_fatal("sanity check on integer size (%lu) failed",
3765  (unsigned long) AH->intSize);
3766 
3767  if (AH->intSize > sizeof(int))
3768  pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3769 
3770  if (AH->version >= K_VERS_1_7)
3771  AH->offSize = AH->ReadBytePtr(AH);
3772  else
3773  AH->offSize = AH->intSize;
3774 
3775  fmt = AH->ReadBytePtr(AH);
3776 
3777  if (AH->format != fmt)
3778  pg_fatal("expected format (%d) differs from format found in file (%d)",
3779  AH->format, fmt);
3780 
3781  if (AH->version >= K_VERS_1_15)
3782  AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
3783  else if (AH->version >= K_VERS_1_2)
3784  {
3785  /* Guess the compression method based on the level */
3786  if (AH->version < K_VERS_1_4)
3787  AH->compression_spec.level = AH->ReadBytePtr(AH);
3788  else
3789  AH->compression_spec.level = ReadInt(AH);
3790 
3791  if (AH->compression_spec.level != 0)
3793  }
3794  else
3796 
3798  if (errmsg)
3799  {
3800  pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
3801  errmsg);
3802  pg_free(errmsg);
3803  }
3804 
3805  if (AH->version >= K_VERS_1_4)
3806  {
3807  struct tm crtm;
3808 
3809  crtm.tm_sec = ReadInt(AH);
3810  crtm.tm_min = ReadInt(AH);
3811  crtm.tm_hour = ReadInt(AH);
3812  crtm.tm_mday = ReadInt(AH);
3813  crtm.tm_mon = ReadInt(AH);
3814  crtm.tm_year = ReadInt(AH);
3815  crtm.tm_isdst = ReadInt(AH);
3816 
3817  /*
3818  * Newer versions of glibc have mktime() report failure if tm_isdst is
3819  * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3820  * TZ=UTC. This is problematic when restoring an archive under a
3821  * different timezone setting. If we get a failure, try again with
3822  * tm_isdst set to -1 ("don't know").
3823  *
3824  * XXX with or without this hack, we reconstruct createDate
3825  * incorrectly when the prevailing timezone is different from
3826  * pg_dump's. Next time we bump the archive version, we should flush
3827  * this representation and store a plain seconds-since-the-Epoch
3828  * timestamp instead.
3829  */
3830  AH->createDate = mktime(&crtm);
3831  if (AH->createDate == (time_t) -1)
3832  {
3833  crtm.tm_isdst = -1;
3834  AH->createDate = mktime(&crtm);
3835  if (AH->createDate == (time_t) -1)
3836  pg_log_warning("invalid creation date in header");
3837  }
3838  }
3839 
3840  if (AH->version >= K_VERS_1_4)
3841  {
3842  AH->archdbname = ReadStr(AH);
3843  }
3844 
3845  if (AH->version >= K_VERS_1_10)
3846  {
3847  AH->archiveRemoteVersion = ReadStr(AH);
3848  AH->archiveDumpVersion = ReadStr(AH);
3849  }
3850 }
3851 
3852 
3853 /*
3854  * checkSeek
3855  * check to see if ftell/fseek can be performed.
3856  */
3857 bool
3858 checkSeek(FILE *fp)
3859 {
3860  pgoff_t tpos;
3861 
3862  /* Check that ftello works on this file */
3863  tpos = ftello(fp);
3864  if (tpos < 0)
3865  return false;
3866 
3867  /*
3868  * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3869  * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3870  * successful no-op even on files that are otherwise unseekable.
3871  */
3872  if (fseeko(fp, tpos, SEEK_SET) != 0)
3873  return false;
3874 
3875  return true;
3876 }
3877 
3878 
3879 /*
3880  * dumpTimestamp
3881  */
3882 static void
3883 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3884 {
3885  char buf[64];
3886 
3887  if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3888  ahprintf(AH, "-- %s %s\n\n", msg, buf);
3889 }
3890 
3891 /*
3892  * Main engine for parallel restore.
3893  *
3894  * Parallel restore is done in three phases. In this first phase,
3895  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3896  * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3897  * PRE_DATA items other than ACLs.) Entries we can't process now are
3898  * added to the pending_list for later phases to deal with.
3899  */
3900 static void
3902 {
3903  bool skipped_some;
3904  TocEntry *next_work_item;
3905 
3906  pg_log_debug("entering restore_toc_entries_prefork");
3907 
3908  /* Adjust dependency information */
3909  fix_dependencies(AH);
3910 
3911  /*
3912  * Do all the early stuff in a single connection in the parent. There's no
3913  * great point in running it in parallel, in fact it will actually run
3914  * faster in a single connection because we avoid all the connection and
3915  * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3916  * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3917  * not risk trying to process them out-of-order.
3918  *
3919  * Stuff that we can't do immediately gets added to the pending_list.
3920  * Note: we don't yet filter out entries that aren't going to be restored.
3921  * They might participate in dependency chains connecting entries that
3922  * should be restored, so we treat them as live until we actually process
3923  * them.
3924  *
3925  * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3926  * before DATA items, and all DATA items before POST_DATA items. That is
3927  * not certain to be true in older archives, though, and in any case use
3928  * of a list file would destroy that ordering (cf. SortTocFromFile). So
3929  * this loop cannot assume that it holds.
3930  */
3932  skipped_some = false;
3933  for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3934  {
3935  bool do_now = true;
3936 
3937  if (next_work_item->section != SECTION_PRE_DATA)
3938  {
3939  /* DATA and POST_DATA items are just ignored for now */
3940  if (next_work_item->section == SECTION_DATA ||
3941  next_work_item->section == SECTION_POST_DATA)
3942  {
3943  do_now = false;
3944  skipped_some = true;
3945  }
3946  else
3947  {
3948  /*
3949  * SECTION_NONE items, such as comments, can be processed now
3950  * if we are still in the PRE_DATA part of the archive. Once
3951  * we've skipped any items, we have to consider whether the
3952  * comment's dependencies are satisfied, so skip it for now.
3953  */
3954  if (skipped_some)
3955  do_now = false;
3956  }
3957  }
3958 
3959  /*
3960  * Also skip items that need to be forced into later passes. We need
3961  * not set skipped_some in this case, since by assumption no main-pass
3962  * items could depend on these.
3963  */
3964  if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3965  do_now = false;
3966 
3967  if (do_now)
3968  {
3969  /* OK, restore the item and update its dependencies */
3970  pg_log_info("processing item %d %s %s",
3971  next_work_item->dumpId,
3972  next_work_item->desc, next_work_item->tag);
3973 
3974  (void) restore_toc_entry(AH, next_work_item, false);
3975 
3976  /* Reduce dependencies, but don't move anything to ready_list */
3977  reduce_dependencies(AH, next_work_item, NULL);
3978  }
3979  else
3980  {
3981  /* Nope, so add it to pending_list */
3982  pending_list_append(pending_list, next_work_item);
3983  }
3984  }
3985 
3986  /*
3987  * Now close parent connection in prep for parallel steps. We do this
3988  * mainly to ensure that we don't exceed the specified number of parallel
3989  * connections.
3990  */
3991  DisconnectDatabase(&AH->public);
3992 
3993  /* blow away any transient state from the old connection */
3994  free(AH->currUser);
3995  AH->currUser = NULL;
3996  free(AH->currSchema);
3997  AH->currSchema = NULL;
3998  free(AH->currTablespace);
3999  AH->currTablespace = NULL;
4000  free(AH->currTableAm);
4001  AH->currTableAm = NULL;
4002 }
4003 
4004 /*
4005  * Main engine for parallel restore.
4006  *
4007  * Parallel restore is done in three phases. In this second phase,
4008  * we process entries by dispatching them to parallel worker children
4009  * (processes on Unix, threads on Windows), each of which connects
4010  * separately to the database. Inter-entry dependencies are respected,
4011  * and so is the RestorePass multi-pass structure. When we can no longer
4012  * make any entries ready to process, we exit. Normally, there will be
4013  * nothing left to do; but if there is, the third phase will mop up.
4014  */
4015 static void
4017  TocEntry *pending_list)
4018 {
4019  ParallelReadyList ready_list;
4020  TocEntry *next_work_item;
4021 
4022  pg_log_debug("entering restore_toc_entries_parallel");
4023 
4024  /* Set up ready_list with enough room for all known TocEntrys */
4025  ready_list_init(&ready_list, AH->tocCount);
4026 
4027  /*
4028  * The pending_list contains all items that we need to restore. Move all
4029  * items that are available to process immediately into the ready_list.
4030  * After this setup, the pending list is everything that needs to be done
4031  * but is blocked by one or more dependencies, while the ready list
4032  * contains items that have no remaining dependencies and are OK to
4033  * process in the current restore pass.
4034  */
4036  move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4037 
4038  /*
4039  * main parent loop
4040  *
4041  * Keep going until there is no worker still running AND there is no work
4042  * left to be done. Note invariant: at top of loop, there should always
4043  * be at least one worker available to dispatch a job to.
4044  */
4045  pg_log_info("entering main parallel loop");
4046 
4047  for (;;)
4048  {
4049  /* Look for an item ready to be dispatched to a worker */
4050  next_work_item = pop_next_work_item(&ready_list, pstate);
4051  if (next_work_item != NULL)
4052  {
4053  /* If not to be restored, don't waste time launching a worker */
4054  if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4055  {
4056  pg_log_info("skipping item %d %s %s",
4057  next_work_item->dumpId,
4058  next_work_item->desc, next_work_item->tag);
4059  /* Update its dependencies as though we'd completed it */
4060  reduce_dependencies(AH, next_work_item, &ready_list);
4061  /* Loop around to see if anything else can be dispatched */
4062  continue;
4063  }
4064 
4065  pg_log_info("launching item %d %s %s",
4066  next_work_item->dumpId,
4067  next_work_item->desc, next_work_item->tag);
4068 
4069  /* Dispatch to some worker */
4070  DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4071  mark_restore_job_done, &ready_list);
4072  }
4073  else if (IsEveryWorkerIdle(pstate))
4074  {
4075  /*
4076  * Nothing is ready and no worker is running, so we're done with
4077  * the current pass or maybe with the whole process.
4078  */
4079  if (AH->restorePass == RESTORE_PASS_LAST)
4080  break; /* No more parallel processing is possible */
4081 
4082  /* Advance to next restore pass */
4083  AH->restorePass++;
4084  /* That probably allows some stuff to be made ready */
4085  move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4086  /* Loop around to see if anything's now ready */
4087  continue;
4088  }
4089  else
4090  {
4091  /*
4092  * We have nothing ready, but at least one child is working, so
4093  * wait for some subjob to finish.
4094  */
4095  }
4096 
4097  /*
4098  * Before dispatching another job, check to see if anything has
4099  * finished. We should check every time through the loop so as to
4100  * reduce dependencies as soon as possible. If we were unable to
4101  * dispatch any job this time through, wait until some worker finishes
4102  * (and, hopefully, unblocks some pending item). If we did dispatch
4103  * something, continue as soon as there's at least one idle worker.
4104  * Note that in either case, there's guaranteed to be at least one
4105  * idle worker when we return to the top of the loop. This ensures we
4106  * won't block inside DispatchJobForTocEntry, which would be
4107  * undesirable: we'd rather postpone dispatching until we see what's
4108  * been unblocked by finished jobs.
4109  */
4110  WaitForWorkers(AH, pstate,
4111  next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4112  }
4113 
4114  /* There should now be nothing in ready_list. */
4115  Assert(ready_list.first_te > ready_list.last_te);
4116 
4117  ready_list_free(&ready_list);
4118 
4119  pg_log_info("finished main parallel loop");
4120 }
4121 
4122 /*
4123  * Main engine for parallel restore.
4124  *
4125  * Parallel restore is done in three phases. In this third phase,
4126  * we mop up any remaining TOC entries by processing them serially.
4127  * This phase normally should have nothing to do, but if we've somehow
4128  * gotten stuck due to circular dependencies or some such, this provides
4129  * at least some chance of completing the restore successfully.
4130  */
4131 static void
4133 {
4134  RestoreOptions *ropt = AH->public.ropt;
4135  TocEntry *te;
4136 
4137  pg_log_debug("entering restore_toc_entries_postfork");
4138 
4139  /*
4140  * Now reconnect the single parent connection.
4141  */
4142  ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4143 
4144  /* re-establish fixed state */
4146 
4147  /*
4148  * Make sure there is no work left due to, say, circular dependencies, or
4149  * some other pathological condition. If so, do it in the single parent
4150  * connection. We don't sweat about RestorePass ordering; it's likely we
4151  * already violated that.
4152  */
4153  for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4154  {
4155  pg_log_info("processing missed item %d %s %s",
4156  te->dumpId, te->desc, te->tag);
4157  (void) restore_toc_entry(AH, te, false);
4158  }
4159 }
4160 
4161 /*
4162  * Check if te1 has an exclusive lock requirement for an item that te2 also
4163  * requires, whether or not te2's requirement is for an exclusive lock.
4164  */
4165 static bool
4167 {
4168  int j,
4169  k;
4170 
4171  for (j = 0; j < te1->nLockDeps; j++)
4172  {
4173  for (k = 0; k < te2->nDeps; k++)
4174  {
4175  if (te1->lockDeps[j] == te2->dependencies[k])
4176  return true;
4177  }
4178  }
4179  return false;
4180 }
4181 
4182 
4183 /*
4184  * Initialize the header of the pending-items list.
4185  *
4186  * This is a circular list with a dummy TocEntry as header, just like the
4187  * main TOC list; but we use separate list links so that an entry can be in
4188  * the main TOC list as well as in the pending list.
4189  */
4190 static void
4192 {
4193  l->pending_prev = l->pending_next = l;
4194 }
4195 
4196 /* Append te to the end of the pending-list headed by l */
4197 static void
4199 {
4200  te->pending_prev = l->pending_prev;
4201  l->pending_prev->pending_next = te;
4202  l->pending_prev = te;
4203  te->pending_next = l;
4204 }
4205 
4206 /* Remove te from the pending-list */
4207 static void
4209 {
4212  te->pending_prev = NULL;
4213  te->pending_next = NULL;
4214 }
4215 
4216 
4217 /*
4218  * Initialize the ready_list with enough room for up to tocCount entries.
4219  */
4220 static void
4221 ready_list_init(ParallelReadyList *ready_list, int tocCount)
4222 {
4223  ready_list->tes = (TocEntry **)
4224  pg_malloc(tocCount * sizeof(TocEntry *));
4225  ready_list->first_te = 0;
4226  ready_list->last_te = -1;
4227  ready_list->sorted = false;
4228 }
4229 
4230 /*
4231  * Free storage for a ready_list.
4232  */
4233 static void
4235 {
4236  pg_free(ready_list->tes);
4237 }
4238 
4239 /* Add te to the ready_list */
4240 static void
4242 {
4243  ready_list->tes[++ready_list->last_te] = te;
4244  /* List is (probably) not sorted anymore. */
4245  ready_list->sorted = false;
4246 }
4247 
4248 /* Remove the i'th entry in the ready_list */
4249 static void
4251 {
4252  int f = ready_list->first_te;
4253 
4254  Assert(i >= f && i <= ready_list->last_te);
4255 
4256  /*
4257  * In the typical case where the item to be removed is the first ready
4258  * entry, we need only increment first_te to remove it. Otherwise, move
4259  * the entries before it to compact the list. (This preserves sortedness,
4260  * if any.) We could alternatively move the entries after i, but there
4261  * are typically many more of those.
4262  */
4263  if (i > f)
4264  {
4265  TocEntry **first_te_ptr = &ready_list->tes[f];
4266 
4267  memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4268  }
4269  ready_list->first_te++;
4270 }
4271 
4272 /* Sort the ready_list into the desired order */
4273 static void
4275 {
4276  if (!ready_list->sorted)
4277  {
4278  int n = ready_list->last_te - ready_list->first_te + 1;
4279 
4280  if (n > 1)
4281  qsort(ready_list->tes + ready_list->first_te, n,
4282  sizeof(TocEntry *),
4284  ready_list->sorted = true;
4285  }
4286 }
4287 
4288 /* qsort comparator for sorting TocEntries by dataLength */
4289 static int
4290 TocEntrySizeCompare(const void *p1, const void *p2)
4291 {
4292  const TocEntry *te1 = *(const TocEntry *const *) p1;
4293  const TocEntry *te2 = *(const TocEntry *const *) p2;
4294 
4295  /* Sort by decreasing dataLength */
4296  if (te1->dataLength > te2->dataLength)
4297  return -1;
4298  if (te1->dataLength < te2->dataLength)
4299  return 1;
4300 
4301  /* For equal dataLengths, sort by dumpId, just to be stable */
4302  if (te1->dumpId < te2->dumpId)
4303  return -1;
4304  if (te1->dumpId > te2->dumpId)
4305  return 1;
4306 
4307  return 0;
4308 }
4309 
4310 
4311 /*
4312  * Move all immediately-ready items from pending_list to ready_list.
4313  *
4314  * Items are considered ready if they have no remaining dependencies and
4315  * they belong in the current restore pass. (See also reduce_dependencies,
4316  * which applies the same logic one-at-a-time.)
4317  */
4318 static void
4320  ParallelReadyList *ready_list,
4321  RestorePass pass)
4322 {
4323  TocEntry *te;
4324  TocEntry *next_te;
4325 
4326  for (te = pending_list->pending_next; te != pending_list; te = next_te)
4327  {
4328  /* must save list link before possibly removing te from list */
4329  next_te = te->pending_next;
4330 
4331  if (te->depCount == 0 &&
4332  _tocEntryRestorePass(te) == pass)
4333  {
4334  /* Remove it from pending_list ... */
4335  pending_list_remove(te);
4336  /* ... and add to ready_list */
4337  ready_list_insert(ready_list, te);
4338  }
4339  }
4340 }
4341 
4342 /*
4343  * Find the next work item (if any) that is capable of being run now,
4344  * and remove it from the ready_list.
4345  *
4346  * Returns the item, or NULL if nothing is runnable.
4347  *
4348  * To qualify, the item must have no remaining dependencies
4349  * and no requirements for locks that are incompatible with
4350  * items currently running. Items in the ready_list are known to have
4351  * no remaining dependencies, but we have to check for lock conflicts.
4352  */
4353 static TocEntry *
4355  ParallelState *pstate)
4356 {
4357  /*
4358  * Sort the ready_list so that we'll tackle larger jobs first.
4359  */
4360  ready_list_sort(ready_list);
4361 
4362  /*
4363  * Search the ready_list until we find a suitable item.
4364  */
4365  for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4366  {
4367  TocEntry *te = ready_list->tes[i];
4368  bool conflicts = false;
4369 
4370  /*
4371  * Check to see if the item would need exclusive lock on something
4372  * that a currently running item also needs lock on, or vice versa. If
4373  * so, we don't want to schedule them together.
4374  */
4375  for (int k = 0; k < pstate->numWorkers; k++)
4376  {
4377  TocEntry *running_te = pstate->te[k];
4378 
4379  if (running_te == NULL)
4380  continue;
4381  if (has_lock_conflicts(te, running_te) ||
4382  has_lock_conflicts(running_te, te))
4383  {
4384  conflicts = true;
4385  break;
4386  }
4387  }
4388 
4389  if (conflicts)
4390  continue;
4391 
4392  /* passed all tests, so this item can run */
4393  ready_list_remove(ready_list, i);
4394  return te;
4395  }
4396 
4397  pg_log_debug("no item ready");
4398  return NULL;
4399 }
4400 
4401 
4402 /*
4403  * Restore a single TOC item in parallel with others
4404  *
4405  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4406  * (everything else). A worker process executes several such work items during
4407  * a parallel backup or restore. Once we terminate here and report back that
4408  * our work is finished, the leader process will assign us a new work item.
4409  */
4410 int
4412 {
4413  int status;
4414 
4415  Assert(AH->connection != NULL);
4416 
4417  /* Count only errors associated with this TOC entry */
4418  AH->public.n_errors = 0;
4419 
4420  /* Restore the TOC item */
4421  status = restore_toc_entry(AH, te, true);
4422 
4423  return status;
4424 }
4425 
4426 
4427 /*
4428  * Callback function that's invoked in the leader process after a step has
4429  * been parallel restored.
4430  *
4431  * Update status and reduce the dependency count of any dependent items.
4432  */
4433 static void
4435  TocEntry *te,
4436  int status,
4437  void *callback_data)
4438 {
4439  ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4440 
4441  pg_log_info("finished item %d %s %s",
4442  te->dumpId, te->desc, te->tag);
4443 
4444  if (status == WORKER_CREATE_DONE)
4445  mark_create_done(AH, te);
4446  else if (status == WORKER_INHIBIT_DATA)
4447  {
4449  AH->public.n_errors++;
4450  }
4451  else if (status == WORKER_IGNORED_ERRORS)
4452  AH->public.n_errors++;
4453  else if (status != 0)
4454  pg_fatal("worker process failed: exit code %d",
4455  status);
4456 
4457  reduce_dependencies(AH, te, ready_list);
4458 }
4459 
4460 
4461 /*
4462  * Process the dependency information into a form useful for parallel restore.
4463  *
4464  * This function takes care of fixing up some missing or badly designed
4465  * dependencies, and then prepares subsidiary data structures that will be
4466  * used in the main parallel-restore logic, including:
4467  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4468  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4469  * dependencies for each TOC entry.
4470  *
4471  * We also identify locking dependencies so that we can avoid trying to
4472  * schedule conflicting items at the same time.
4473  */
4474 static void
4476 {
4477  TocEntry *te;
4478  int i;
4479 
4480  /*
4481  * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4482  * items are marked as not being in any parallel-processing list.
4483  */
4484  for (te = AH->toc->next; te != AH->toc; te = te->next)
4485  {
4486  te->depCount = te->nDeps;
4487  te->revDeps = NULL;
4488  te->nRevDeps = 0;
4489  te->pending_prev = NULL;
4490  te->pending_next = NULL;
4491  }
4492 
4493  /*
4494  * POST_DATA items that are shown as depending on a table need to be
4495  * re-pointed to depend on that table's data, instead. This ensures they
4496  * won't get scheduled until the data has been loaded.
4497  */
4499 
4500  /*
4501  * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4502  * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4503  * one BLOB COMMENTS in such files.)
4504  */
4505  if (AH->version < K_VERS_1_11)
4506  {
4507  for (te = AH->toc->next; te != AH->toc; te = te->next)
4508  {
4509  if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4510  {
4511  TocEntry *te2;
4512 
4513  for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4514  {
4515  if (strcmp(te2->desc, "BLOBS") == 0)
4516  {
4517  te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4518  te->dependencies[0] = te2->dumpId;
4519  te->nDeps++;
4520  te->depCount++;
4521  break;
4522  }
4523  }
4524  break;
4525  }
4526  }
4527  }
4528 
4529  /*
4530  * At this point we start to build the revDeps reverse-dependency arrays,
4531  * so all changes of dependencies must be complete.
4532  */
4533 
4534  /*
4535  * Count the incoming dependencies for each item. Also, it is possible
4536  * that the dependencies list items that are not in the archive at all
4537  * (that should not happen in 9.2 and later, but is highly likely in older
4538  * archives). Subtract such items from the depCounts.
4539  */
4540  for (te = AH->toc->next; te != AH->toc; te = te->next)
4541  {
4542  for (i = 0; i < te->nDeps; i++)
4543  {
4544  DumpId depid = te->dependencies[i];
4545 
4546  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4547  AH->tocsByDumpId[depid]->nRevDeps++;
4548  else
4549  te->depCount--;
4550  }
4551  }
4552 
4553  /*
4554  * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4555  * it as a counter below.
4556  */
4557  for (te = AH->toc->next; te != AH->toc; te = te->next)
4558  {
4559  if (te->nRevDeps > 0)
4560  te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4561  te->nRevDeps = 0;
4562  }
4563 
4564  /*
4565  * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4566  * better agree with the loops above.
4567  */
4568  for (te = AH->toc->next; te != AH->toc; te = te->next)
4569  {
4570  for (i = 0; i < te->nDeps; i++)
4571  {
4572  DumpId depid = te->dependencies[i];
4573 
4574  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4575  {
4576  TocEntry *otherte = AH->tocsByDumpId[depid];
4577 
4578  otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4579  }
4580  }
4581  }
4582 
4583  /*
4584  * Lastly, work out the locking dependencies.
4585  */
4586  for (te = AH->toc->next; te != AH->toc; te = te->next)
4587  {
4588  te->lockDeps = NULL;
4589  te->nLockDeps = 0;
4591  }
4592 }
4593 
4594 /*
4595  * Change dependencies on table items to depend on table data items instead,
4596  * but only in POST_DATA items.
4597  *
4598  * Also, for any item having such dependency(s), set its dataLength to the
4599  * largest dataLength of the table data items it depends on. This ensures
4600  * that parallel restore will prioritize larger jobs (index builds, FK
4601  * constraint checks, etc) over smaller ones, avoiding situations where we
4602  * end a restore with only one active job working on a large table.
4603  */
4604 static void
4606 {
4607  TocEntry *te;
4608  int i;
4609  DumpId olddep;
4610 
4611  for (te = AH->toc->next; te != AH->toc; te = te->next)
4612  {
4613  if (te->section != SECTION_POST_DATA)
4614  continue;
4615  for (i = 0; i < te->nDeps; i++)
4616  {
4617  olddep = te->dependencies[i];
4618  if (olddep <= AH->maxDumpId &&
4619  AH->tableDataId[olddep] != 0)
4620  {
4621  DumpId tabledataid = AH->tableDataId[olddep];
4622  TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4623 
4624  te->dependencies[i] = tabledataid;
4625  te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4626  pg_log_debug("transferring dependency %d -> %d to %d",
4627  te->dumpId, olddep, tabledataid);
4628  }
4629  }
4630  }
4631 }
4632 
4633 /*
4634  * Identify which objects we'll need exclusive lock on in order to restore
4635  * the given TOC entry (*other* than the one identified by the TOC entry
4636  * itself). Record their dump IDs in the entry's lockDeps[] array.
4637  */
4638 static void
4640 {
4641  DumpId *lockids;
4642  int nlockids;
4643  int i;
4644 
4645  /*
4646  * We only care about this for POST_DATA items. PRE_DATA items are not
4647  * run in parallel, and DATA items are all independent by assumption.
4648  */
4649  if (te->section != SECTION_POST_DATA)
4650  return;
4651 
4652  /* Quick exit if no dependencies at all */
4653  if (te->nDeps == 0)
4654  return;
4655 
4656  /*
4657  * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4658  * and hence require exclusive lock. However, we know that CREATE INDEX
4659  * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4660  * category too ... but today is not that day.)
4661  */
4662  if (strcmp(te->desc, "INDEX") == 0)
4663  return;
4664 
4665  /*
4666  * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4667  * item listed among its dependencies. Originally all of these would have
4668  * been TABLE items, but repoint_table_dependencies would have repointed
4669  * them to the TABLE DATA items if those are present (which they might not
4670  * be, eg in a schema-only dump). Note that all of the entries we are
4671  * processing here are POST_DATA; otherwise there might be a significant
4672  * difference between a dependency on a table and a dependency on its
4673  * data, so that closer analysis would be needed here.
4674  */
4675  lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4676  nlockids = 0;
4677  for (i = 0; i < te->nDeps; i++)
4678  {
4679  DumpId depid = te->dependencies[i];
4680 
4681  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4682  ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4683  strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4684  lockids[nlockids++] = depid;
4685  }
4686 
4687  if (nlockids == 0)
4688  {
4689  free(lockids);
4690  return;
4691  }
4692 
4693  te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4694  te->nLockDeps = nlockids;
4695 }
4696 
4697 /*
4698  * Remove the specified TOC entry from the depCounts of items that depend on
4699  * it, thereby possibly making them ready-to-run. Any pending item that
4700  * becomes ready should be moved to the ready_list, if that's provided.
4701  */
4702 static void
4704  ParallelReadyList *ready_list)
4705 {
4706  int i;
4707 
4708  pg_log_debug("reducing dependencies for %d", te->dumpId);
4709 
4710  for (i = 0; i < te->nRevDeps; i++)
4711  {
4712  TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4713 
4714  Assert(otherte->depCount > 0);
4715  otherte->depCount--;
4716 
4717  /*
4718  * It's ready if it has no remaining dependencies, and it belongs in
4719  * the current restore pass, and it is currently a member of the
4720  * pending list (that check is needed to prevent double restore in
4721  * some cases where a list-file forces out-of-order restoring).
4722  * However, if ready_list == NULL then caller doesn't want any list
4723  * memberships changed.
4724  */
4725  if (otherte->depCount == 0 &&
4726  _tocEntryRestorePass(otherte) == AH->restorePass &&
4727  otherte->pending_prev != NULL &&
4728  ready_list != NULL)
4729  {
4730  /* Remove it from pending list ... */
4731  pending_list_remove(otherte);
4732  /* ... and add to ready_list */
4733  ready_list_insert(ready_list, otherte);
4734  }
4735  }
4736 }
4737 
4738 /*
4739  * Set the created flag on the DATA member corresponding to the given
4740  * TABLE member
4741  */
4742 static void
4744 {
4745  if (AH->tableDataId[te->dumpId] != 0)
4746  {
4747  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4748 
4749  ted->created = true;
4750  }
4751 }
4752 
4753 /*
4754  * Mark the DATA member corresponding to the given TABLE member
4755  * as not wanted
4756  */
4757 static void
4759 {
4760  pg_log_info("table \"%s\" could not be created, will not restore its data",
4761  te->tag);
4762 
4763  if (AH->tableDataId[te->dumpId] != 0)
4764  {
4765  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4766 
4767  ted->reqs = 0;
4768  }
4769 }
4770 
4771 /*
4772  * Clone and de-clone routines used in parallel restoration.
4773  *
4774  * Enough of the structure is cloned to ensure that there is no
4775  * conflict between different threads each with their own clone.
4776  */
4777 ArchiveHandle *
4779 {
4780  ArchiveHandle *clone;
4781 
4782  /* Make a "flat" copy */
4783  clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4784  memcpy(clone, AH, sizeof(ArchiveHandle));
4785 
4786  /* Handle format-independent fields */
4787  memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4788 
4789  /* The clone will have its own connection, so disregard connection state */
4790  clone->connection = NULL;
4791  clone->connCancel = NULL;
4792  clone->currUser = NULL;
4793  clone->currSchema = NULL;
4794  clone->currTableAm = NULL;
4795  clone->currTablespace = NULL;
4796 
4797  /* savedPassword must be local in case we change it while connecting */
4798  if (clone->savedPassword)
4799  clone->savedPassword = pg_strdup(clone->savedPassword);
4800 
4801  /* clone has its own error count, too */
4802  clone->public.n_errors = 0;
4803 
4804  /*
4805  * Connect our new clone object to the database, using the same connection
4806  * parameters used for the original connection.
4807  */
4808  ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4809 
4810  /* re-establish fixed state */
4811  if (AH->mode == archModeRead)
4812  _doSetFixedOutputState(clone);
4813  /* in write case, setupDumpWorker will fix up connection state */
4814 
4815  /* Let the format-specific code have a chance too */
4816  clone->ClonePtr(clone);
4817 
4818  Assert(clone->connection != NULL);
4819  return clone;
4820 }
4821 
4822 /*
4823  * Release clone-local storage.
4824  *
4825  * Note: we assume any clone-local connection was already closed.
4826  */
4827 void
4829 {
4830  /* Should not have an open database connection */
4831  Assert(AH->connection == NULL);
4832 
4833  /* Clear format-specific state */
4834  AH->DeClonePtr(AH);
4835 
4836  /* Clear state allocated by CloneArchive */
4837  if (AH->sqlparse.curCmd)
4839 
4840  /* Clear any connection-local state */
4841  free(AH->currUser);
4842  free(AH->currSchema);
4843  free(AH->currTablespace);
4844  free(AH->currTableAm);
4845  free(AH->savedPassword);
4846 
4847  free(AH);
4848 }
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1059
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1451
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1205
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1268
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:897
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
#define PG_BINARY_R
Definition: c.h:1262
#define ngettext(s, p, n)
Definition: c.h:1171
#define PG_BINARY_A
Definition: c.h:1261
#define Max(x, y)
Definition: c.h:982
#define PG_BINARY_W
Definition: c.h:1263
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:281
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:86
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
Definition: compress_io.c:187
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition: compression.c:67
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:1069
int pg_char_to_encoding(const char *name)
Definition: encnames.c:550
const char * pg_encoding_to_char(int encoding)
Definition: encnames.c:588
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6866
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7020
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3244
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2229
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:474
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:667
char sign
Definition: informix.c:668
int b
Definition: isn.c:70
return true
Definition: isn.c:126
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
#define INV_WRITE
Definition: libpq-fs.h:21
static void const char * fmt
va_end(args)
Assert(fmt[strlen(fmt) - 1] !='\n')
va_start(args, fmt)
static struct pg_tm tm
Definition: localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:216
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_PRIMARY
Definition: logging.h:67
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:133
static AmcheckOptions opts
Definition: pg_amcheck.c:110
void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:110
@ SECTION_NONE
Definition: pg_backup.h:56
@ SECTION_POST_DATA
Definition: pg_backup.h:59
@ SECTION_PRE_DATA
Definition: pg_backup.h:57
@ SECTION_DATA
Definition: pg_backup.h:58
int DumpId
Definition: pg_backup.h:267
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:277
enum _archiveFormat ArchiveFormat
@ archModeWrite
Definition: pg_backup.h:50
@ archModeAppend
Definition: pg_backup.h:49
@ archModeRead
Definition: pg_backup.h:51
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:225
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:40
@ archTar
Definition: pg_backup.h:42
@ archCustom
Definition: pg_backup.h:41
@ archDirectory
Definition: pg_backup.h:44
@ archNull
Definition: pg_backup.h:43
static void fix_dependencies(ArchiveHandle *AH)
static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
static void _becomeUser(ArchiveHandle *AH, const char *user)
static void ready_list_free(ParallelReadyList *ready_list)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void pending_list_append(TocEntry *l, TocEntry *te)
static void ready_list_sort(ParallelReadyList *ready_list)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
static bool _tocEntryIsACL(TocEntry *te)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void ready_list_remove(ParallelReadyList *ready_list, int i)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
RestoreOptions * NewRestoreOptions(void)
static RestorePass _tocEntryRestorePass(TocEntry *te)
int StartLO(Archive *AHX, Oid oid)
static void setupRestoreWorker(Archive *AHX)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
void StartRestoreLOs(ArchiveHandle *AH)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, ParallelReadyList *ready_list)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
char * ReadStr(ArchiveHandle *AH)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
DumpOptions * NewDumpOptions(void)
void ReadToc(ArchiveHandle *AH)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr)
static char * sanitize_line(const char *str, bool want_hyphen)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
static int TocEntrySizeCompare(const void *p1, const void *p2)
void RestoreArchive(Archive *AHX)
static void ready_list_init(ParallelReadyList *ready_list, int tocCount)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static void move_to_ready_list(TocEntry *pending_list, ParallelReadyList *ready_list, RestorePass pass)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
struct _parallelReadyList ParallelReadyList
static TocEntry * pop_next_work_item(ParallelReadyList *ready_list, ParallelState *pstate)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void InitDumpOptions(DumpOptions *opts)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static void dump_lo_buf(ArchiveHandle *AH)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
Definition: pg_backup_db.c:545
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
Definition: pg_backup_db.c:74
#define ARCHIVE_REV(version)
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
@ ACT_DUMP
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
Definition: pg_backup_db.c:500
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
Definition: pg_backup_db.c:445
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
Definition: pg_checksums.c:65
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition: pg_ctl.c:79
int32 encoding
Definition: pg_database.h:41
static bool dosync
Definition: pg_dump.c:101
static void setupDumpWorker(Archive *AH)
Definition: pg_dump.c:1309
#define exit_nicely(code)
Definition: pg_dumpall.c:124
static char * filename
Definition: pg_dumpall.c:119
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition: pg_get_line.c:95
static char * user
Definition: pg_regress.c:93
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:224
static char * buf
Definition: pg_test_fsync.c:67
char * tablespace
Definition: pgbench.c:226
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)
Definition: port.h:445
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
char * c
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition: psprintf.c:106
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
const char * simple_string_list_not_touched(SimpleStringList *list)
Definition: simple_list.c:144
char * dbname
Definition: streamutil.c:51
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
Definition: string_utils.c:590
const char * fmtId(const char *rawid)
Definition: string_utils.c:64
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
int minRemoteVersion
Definition: pg_backup.h:219
char * remoteVersionStr
Definition: pg_backup.h:215
DumpOptions * dopt
Definition: pg_backup.h:211
bool exit_on_error
Definition: pg_backup.h:234
char * searchpath
Definition: pg_backup.h:230
int maxRemoteVersion
Definition: pg_backup.h:220
int n_errors
Definition: pg_backup.h:235
bool std_strings
Definition: pg_backup.h:227
int numWorkers
Definition: pg_backup.h:222
int encoding
Definition: pg_backup.h:226
int verbose
Definition: pg_backup.h:214
RestoreOptions * ropt
Definition: pg_backup.h:212