PostgreSQL Source Code  git master
pg_backup_archiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  * Private implementation of the archiver routines.
6  *
7  * See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  * Rights are granted to use this software in any way so long
11  * as this notice is not removed.
12  *
13  * The author is not responsible for loss or damages that may
14  * result from its use.
15  *
16  *
17  * IDENTIFICATION
18  * src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "common/string.h"
34 #include "dumputils.h"
35 #include "fe_utils/string_utils.h"
36 #include "lib/stringinfo.h"
37 #include "libpq/libpq-fs.h"
38 #include "parallel.h"
39 #include "pg_backup_archiver.h"
40 #include "pg_backup_db.h"
41 #include "pg_backup_utils.h"
42 
43 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
44 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
45 
46 /* state needed to save/restore an archive's output target */
47 typedef struct _outputContext
48 {
49  void *OF;
50  int gzOut;
52 
53 /*
54  * State for tracking TocEntrys that are ready to process during a parallel
55  * restore. (This used to be a list, and we still call it that, though now
56  * it's really an array so that we can apply qsort to it.)
57  *
58  * tes[] is sized large enough that we can't overrun it.
59  * The valid entries are indexed first_te .. last_te inclusive.
60  * We periodically sort the array to bring larger-by-dataLength entries to
61  * the front; "sorted" is true if the valid entries are known sorted.
62  */
63 typedef struct _parallelReadyList
64 {
65  TocEntry **tes; /* Ready-to-dump TocEntrys */
66  int first_te; /* index of first valid entry in tes[] */
67  int last_te; /* index of last valid entry in tes[] */
68  bool sorted; /* are valid entries currently sorted? */
70 
71 
72 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
73  const int compression, bool dosync, ArchiveMode mode,
74  SetupWorkerPtrType setupWorkerPtr);
76 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
77 static char *sanitize_line(const char *str, bool want_hyphen);
78 static void _doSetFixedOutputState(ArchiveHandle *AH);
79 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
80 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
81 static void _becomeUser(ArchiveHandle *AH, const char *user);
82 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
83 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
84 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
85 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
86 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
87 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
88 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
89 static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
91 static bool _tocEntryIsACL(TocEntry *te);
94 static void buildTocEntryArrays(ArchiveHandle *AH);
95 static void _moveBefore(TocEntry *pos, TocEntry *te);
97 
98 static int RestoringToDB(ArchiveHandle *AH);
99 static void dump_lo_buf(ArchiveHandle *AH);
100 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
101 static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
103 static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
104 
105 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
107  TocEntry *pending_list);
109  ParallelState *pstate,
110  TocEntry *pending_list);
112  TocEntry *pending_list);
113 static void pending_list_header_init(TocEntry *l);
114 static void pending_list_append(TocEntry *l, TocEntry *te);
115 static void pending_list_remove(TocEntry *te);
116 static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
117 static void ready_list_free(ParallelReadyList *ready_list);
118 static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
119 static void ready_list_remove(ParallelReadyList *ready_list, int i);
120 static void ready_list_sort(ParallelReadyList *ready_list);
121 static int TocEntrySizeCompare(const void *p1, const void *p2);
122 static void move_to_ready_list(TocEntry *pending_list,
123  ParallelReadyList *ready_list,
124  RestorePass pass);
125 static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
126  ParallelState *pstate);
127 static void mark_dump_job_done(ArchiveHandle *AH,
128  TocEntry *te,
129  int status,
130  void *callback_data);
131 static void mark_restore_job_done(ArchiveHandle *AH,
132  TocEntry *te,
133  int status,
134  void *callback_data);
135 static void fix_dependencies(ArchiveHandle *AH);
136 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
139 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
140  ParallelReadyList *ready_list);
141 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
143 
144 static void StrictNamesCheck(RestoreOptions *ropt);
145 
146 
147 /*
148  * Allocate a new DumpOptions block containing all default values.
149  */
150 DumpOptions *
152 {
154 
156  return opts;
157 }
158 
159 /*
160  * Initialize a DumpOptions struct to all default values
161  */
162 void
164 {
165  memset(opts, 0, sizeof(DumpOptions));
166  /* set any fields that shouldn't default to zeroes */
167  opts->include_everything = true;
168  opts->cparams.promptPassword = TRI_DEFAULT;
169  opts->dumpSections = DUMP_UNSECTIONED;
170 }
171 
172 /*
173  * Create a freshly allocated DumpOptions with options equivalent to those
174  * found in the given RestoreOptions.
175  */
176 DumpOptions *
178 {
179  DumpOptions *dopt = NewDumpOptions();
180 
181  /* this is the inverse of what's at the end of pg_dump.c's main() */
182  dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
183  dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
184  dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
185  dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
187  dopt->outputClean = ropt->dropSchema;
188  dopt->dataOnly = ropt->dataOnly;
189  dopt->schemaOnly = ropt->schemaOnly;
190  dopt->if_exists = ropt->if_exists;
191  dopt->column_inserts = ropt->column_inserts;
192  dopt->dumpSections = ropt->dumpSections;
193  dopt->aclsSkip = ropt->aclsSkip;
194  dopt->outputSuperuser = ropt->superuser;
195  dopt->outputCreateDB = ropt->createDB;
196  dopt->outputNoOwner = ropt->noOwner;
197  dopt->outputNoTablespaces = ropt->noTablespace;
198  dopt->disable_triggers = ropt->disable_triggers;
199  dopt->use_setsessauth = ropt->use_setsessauth;
201  dopt->dump_inserts = ropt->dump_inserts;
202  dopt->no_comments = ropt->no_comments;
203  dopt->no_publications = ropt->no_publications;
205  dopt->no_subscriptions = ropt->no_subscriptions;
206  dopt->lockWaitTimeout = ropt->lockWaitTimeout;
209  dopt->sequence_data = ropt->sequence_data;
210 
211  return dopt;
212 }
213 
214 
215 /*
216  * Wrapper functions.
217  *
218  * The objective is to make writing new formats and dumpers as simple
219  * as possible, if necessary at the expense of extra function calls etc.
220  *
221  */
222 
223 /*
224  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
225  * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
226  * setup doesn't need to know anything much, so it's defined here.
227  */
228 static void
230 {
231  ArchiveHandle *AH = (ArchiveHandle *) AHX;
232 
233  AH->ReopenPtr(AH);
234 }
235 
236 
237 /* Create a new archive */
238 /* Public */
239 Archive *
240 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
241  const int compression, bool dosync, ArchiveMode mode,
243 
244 {
245  ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
247 
248  return (Archive *) AH;
249 }
250 
251 /* Open an existing archive */
252 /* Public */
253 Archive *
254 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
255 {
256  ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
257 
258  return (Archive *) AH;
259 }
260 
261 /* Public */
262 void
264 {
265  int res = 0;
266  ArchiveHandle *AH = (ArchiveHandle *) AHX;
267 
268  AH->ClosePtr(AH);
269 
270  /* Close the output */
271  errno = 0; /* in case gzclose() doesn't set it */
272  if (AH->gzOut)
273  res = GZCLOSE(AH->OF);
274  else if (AH->OF != stdout)
275  res = fclose(AH->OF);
276 
277  if (res != 0)
278  fatal("could not close output file: %m");
279 }
280 
281 /* Public */
282 void
284 {
285  /* Caller can omit dump options, in which case we synthesize them */
286  if (dopt == NULL && ropt != NULL)
287  dopt = dumpOptionsFromRestoreOptions(ropt);
288 
289  /* Save options for later access */
290  AH->dopt = dopt;
291  AH->ropt = ropt;
292 }
293 
294 /* Public */
295 void
297 {
298  ArchiveHandle *AH = (ArchiveHandle *) AHX;
299  RestoreOptions *ropt = AH->public.ropt;
300  TocEntry *te;
301  teSection curSection;
302 
303  /* Decide which TOC entries will be dumped/restored, and mark them */
304  curSection = SECTION_PRE_DATA;
305  for (te = AH->toc->next; te != AH->toc; te = te->next)
306  {
307  /*
308  * When writing an archive, we also take this opportunity to check
309  * that we have generated the entries in a sane order that respects
310  * the section divisions. When reading, don't complain, since buggy
311  * old versions of pg_dump might generate out-of-order archives.
312  */
313  if (AH->mode != archModeRead)
314  {
315  switch (te->section)
316  {
317  case SECTION_NONE:
318  /* ok to be anywhere */
319  break;
320  case SECTION_PRE_DATA:
321  if (curSection != SECTION_PRE_DATA)
322  pg_log_warning("archive items not in correct section order");
323  break;
324  case SECTION_DATA:
325  if (curSection == SECTION_POST_DATA)
326  pg_log_warning("archive items not in correct section order");
327  break;
328  case SECTION_POST_DATA:
329  /* ok no matter which section we were in */
330  break;
331  default:
332  fatal("unexpected section code %d",
333  (int) te->section);
334  break;
335  }
336  }
337 
338  if (te->section != SECTION_NONE)
339  curSection = te->section;
340 
341  te->reqs = _tocEntryRequired(te, curSection, AH);
342  }
343 
344  /* Enforce strict names checking */
345  if (ropt->strict_names)
346  StrictNamesCheck(ropt);
347 }
348 
349 /* Public */
350 void
352 {
353  ArchiveHandle *AH = (ArchiveHandle *) AHX;
354  RestoreOptions *ropt = AH->public.ropt;
355  bool parallel_mode;
356  TocEntry *te;
357  OutputContext sav;
358 
360 
361  /*
362  * If we're going to do parallel restore, there are some restrictions.
363  */
364  parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
365  if (parallel_mode)
366  {
367  /* We haven't got round to making this work for all archive formats */
368  if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
369  fatal("parallel restore is not supported with this archive file format");
370 
371  /* Doesn't work if the archive represents dependencies as OIDs */
372  if (AH->version < K_VERS_1_8)
373  fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
374 
375  /*
376  * It's also not gonna work if we can't reopen the input file, so
377  * let's try that immediately.
378  */
379  AH->ReopenPtr(AH);
380  }
381 
382  /*
383  * Make sure we won't need (de)compression we haven't got
384  */
385 #ifndef HAVE_LIBZ
386  if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
387  {
388  for (te = AH->toc->next; te != AH->toc; te = te->next)
389  {
390  if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
391  fatal("cannot restore from compressed archive (compression not supported in this installation)");
392  }
393  }
394 #endif
395 
396  /*
397  * Prepare index arrays, so we can assume we have them throughout restore.
398  * It's possible we already did this, though.
399  */
400  if (AH->tocsByDumpId == NULL)
402 
403  /*
404  * If we're using a DB connection, then connect it.
405  */
406  if (ropt->useDB)
407  {
408  pg_log_info("connecting to database for restore");
409  if (AH->version < K_VERS_1_3)
410  fatal("direct database connections are not supported in pre-1.3 archives");
411 
412  /*
413  * We don't want to guess at whether the dump will successfully
414  * restore; allow the attempt regardless of the version of the restore
415  * target.
416  */
417  AHX->minRemoteVersion = 0;
418  AHX->maxRemoteVersion = 9999999;
419 
420  ConnectDatabase(AHX, &ropt->cparams, false);
421 
422  /*
423  * If we're talking to the DB directly, don't send comments since they
424  * obscure SQL when displaying errors
425  */
426  AH->noTocComments = 1;
427  }
428 
429  /*
430  * Work out if we have an implied data-only restore. This can happen if
431  * the dump was data only or if the user has used a toc list to exclude
432  * all of the schema data. All we do is look for schema entries - if none
433  * are found then we set the dataOnly flag.
434  *
435  * We could scan for wanted TABLE entries, but that is not the same as
436  * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
437  */
438  if (!ropt->dataOnly)
439  {
440  int impliedDataOnly = 1;
441 
442  for (te = AH->toc->next; te != AH->toc; te = te->next)
443  {
444  if ((te->reqs & REQ_SCHEMA) != 0)
445  { /* It's schema, and it's wanted */
446  impliedDataOnly = 0;
447  break;
448  }
449  }
450  if (impliedDataOnly)
451  {
452  ropt->dataOnly = impliedDataOnly;
453  pg_log_info("implied data-only restore");
454  }
455  }
456 
457  /*
458  * Setup the output file if necessary.
459  */
460  sav = SaveOutput(AH);
461  if (ropt->filename || ropt->compression)
462  SetOutput(AH, ropt->filename, ropt->compression);
463 
464  ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
465 
466  if (AH->archiveRemoteVersion)
467  ahprintf(AH, "-- Dumped from database version %s\n",
469  if (AH->archiveDumpVersion)
470  ahprintf(AH, "-- Dumped by pg_dump version %s\n",
471  AH->archiveDumpVersion);
472 
473  ahprintf(AH, "\n");
474 
475  if (AH->public.verbose)
476  dumpTimestamp(AH, "Started on", AH->createDate);
477 
478  if (ropt->single_txn)
479  {
480  if (AH->connection)
481  StartTransaction(AHX);
482  else
483  ahprintf(AH, "BEGIN;\n\n");
484  }
485 
486  /*
487  * Establish important parameter values right away.
488  */
490 
491  AH->stage = STAGE_PROCESSING;
492 
493  /*
494  * Drop the items at the start, in reverse order
495  */
496  if (ropt->dropSchema)
497  {
498  for (te = AH->toc->prev; te != AH->toc; te = te->prev)
499  {
500  AH->currentTE = te;
501 
502  /*
503  * In createDB mode, issue a DROP *only* for the database as a
504  * whole. Issuing drops against anything else would be wrong,
505  * because at this point we're connected to the wrong database.
506  * (The DATABASE PROPERTIES entry, if any, should be treated like
507  * the DATABASE entry.)
508  */
509  if (ropt->createDB)
510  {
511  if (strcmp(te->desc, "DATABASE") != 0 &&
512  strcmp(te->desc, "DATABASE PROPERTIES") != 0)
513  continue;
514  }
515 
516  /* Otherwise, drop anything that's selected and has a dropStmt */
517  if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
518  {
519  pg_log_info("dropping %s %s", te->desc, te->tag);
520  /* Select owner and schema as necessary */
521  _becomeOwner(AH, te);
522  _selectOutputSchema(AH, te->namespace);
523 
524  /*
525  * Now emit the DROP command, if the object has one. Note we
526  * don't necessarily emit it verbatim; at this point we add an
527  * appropriate IF EXISTS clause, if the user requested it.
528  */
529  if (*te->dropStmt != '\0')
530  {
531  if (!ropt->if_exists)
532  {
533  /* No --if-exists? Then just use the original */
534  ahprintf(AH, "%s", te->dropStmt);
535  }
536  else
537  {
538  /*
539  * Inject an appropriate spelling of "if exists". For
540  * large objects, we have a separate routine that
541  * knows how to do it, without depending on
542  * te->dropStmt; use that. For other objects we need
543  * to parse the command.
544  */
545  if (strncmp(te->desc, "BLOB", 4) == 0)
546  {
547  DropBlobIfExists(AH, te->catalogId.oid);
548  }
549  else
550  {
551  char *dropStmt = pg_strdup(te->dropStmt);
552  char *dropStmtOrig = dropStmt;
553  PQExpBuffer ftStmt = createPQExpBuffer();
554 
555  /*
556  * Need to inject IF EXISTS clause after ALTER
557  * TABLE part in ALTER TABLE .. DROP statement
558  */
559  if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
560  {
561  appendPQExpBufferStr(ftStmt,
562  "ALTER TABLE IF EXISTS");
563  dropStmt = dropStmt + 11;
564  }
565 
566  /*
567  * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
568  * not support the IF EXISTS clause, and therefore
569  * we simply emit the original command for DEFAULT
570  * objects (modulo the adjustment made above).
571  *
572  * Likewise, don't mess with DATABASE PROPERTIES.
573  *
574  * If we used CREATE OR REPLACE VIEW as a means of
575  * quasi-dropping an ON SELECT rule, that should
576  * be emitted unchanged as well.
577  *
578  * For other object types, we need to extract the
579  * first part of the DROP which includes the
580  * object type. Most of the time this matches
581  * te->desc, so search for that; however for the
582  * different kinds of CONSTRAINTs, we know to
583  * search for hardcoded "DROP CONSTRAINT" instead.
584  */
585  if (strcmp(te->desc, "DEFAULT") == 0 ||
586  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
587  strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
588  appendPQExpBufferStr(ftStmt, dropStmt);
589  else
590  {
591  char buffer[40];
592  char *mark;
593 
594  if (strcmp(te->desc, "CONSTRAINT") == 0 ||
595  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
596  strcmp(te->desc, "FK CONSTRAINT") == 0)
597  strcpy(buffer, "DROP CONSTRAINT");
598  else
599  snprintf(buffer, sizeof(buffer), "DROP %s",
600  te->desc);
601 
602  mark = strstr(dropStmt, buffer);
603 
604  if (mark)
605  {
606  *mark = '\0';
607  appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
608  dropStmt, buffer,
609  mark + strlen(buffer));
610  }
611  else
612  {
613  /* complain and emit unmodified command */
614  pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
615  dropStmtOrig);
616  appendPQExpBufferStr(ftStmt, dropStmt);
617  }
618  }
619 
620  ahprintf(AH, "%s", ftStmt->data);
621 
622  destroyPQExpBuffer(ftStmt);
623  pg_free(dropStmtOrig);
624  }
625  }
626  }
627  }
628  }
629 
630  /*
631  * _selectOutputSchema may have set currSchema to reflect the effect
632  * of a "SET search_path" command it emitted. However, by now we may
633  * have dropped that schema; or it might not have existed in the first
634  * place. In either case the effective value of search_path will not
635  * be what we think. Forcibly reset currSchema so that we will
636  * re-establish the search_path setting when needed (after creating
637  * the schema).
638  *
639  * If we treated users as pg_dump'able objects then we'd need to reset
640  * currUser here too.
641  */
642  if (AH->currSchema)
643  free(AH->currSchema);
644  AH->currSchema = NULL;
645  }
646 
647  if (parallel_mode)
648  {
649  /*
650  * In parallel mode, turn control over to the parallel-restore logic.
651  */
652  ParallelState *pstate;
653  TocEntry pending_list;
654 
655  /* The archive format module may need some setup for this */
656  if (AH->PrepParallelRestorePtr)
657  AH->PrepParallelRestorePtr(AH);
658 
659  pending_list_header_init(&pending_list);
660 
661  /* This runs PRE_DATA items and then disconnects from the database */
662  restore_toc_entries_prefork(AH, &pending_list);
663  Assert(AH->connection == NULL);
664 
665  /* ParallelBackupStart() will actually fork the processes */
666  pstate = ParallelBackupStart(AH);
667  restore_toc_entries_parallel(AH, pstate, &pending_list);
668  ParallelBackupEnd(AH, pstate);
669 
670  /* reconnect the leader and see if we missed something */
671  restore_toc_entries_postfork(AH, &pending_list);
672  Assert(AH->connection != NULL);
673  }
674  else
675  {
676  /*
677  * In serial mode, process everything in three phases: normal items,
678  * then ACLs, then post-ACL items. We might be able to skip one or
679  * both extra phases in some cases, eg data-only restores.
680  */
681  bool haveACL = false;
682  bool havePostACL = false;
683 
684  for (te = AH->toc->next; te != AH->toc; te = te->next)
685  {
686  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
687  continue; /* ignore if not to be dumped at all */
688 
689  switch (_tocEntryRestorePass(te))
690  {
691  case RESTORE_PASS_MAIN:
692  (void) restore_toc_entry(AH, te, false);
693  break;
694  case RESTORE_PASS_ACL:
695  haveACL = true;
696  break;
697  case RESTORE_PASS_POST_ACL:
698  havePostACL = true;
699  break;
700  }
701  }
702 
703  if (haveACL)
704  {
705  for (te = AH->toc->next; te != AH->toc; te = te->next)
706  {
707  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
709  (void) restore_toc_entry(AH, te, false);
710  }
711  }
712 
713  if (havePostACL)
714  {
715  for (te = AH->toc->next; te != AH->toc; te = te->next)
716  {
717  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
718  _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
719  (void) restore_toc_entry(AH, te, false);
720  }
721  }
722  }
723 
724  if (ropt->single_txn)
725  {
726  if (AH->connection)
727  CommitTransaction(AHX);
728  else
729  ahprintf(AH, "COMMIT;\n\n");
730  }
731 
732  if (AH->public.verbose)
733  dumpTimestamp(AH, "Completed on", time(NULL));
734 
735  ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
736 
737  /*
738  * Clean up & we're done.
739  */
740  AH->stage = STAGE_FINALIZING;
741 
742  if (ropt->filename || ropt->compression)
743  RestoreOutput(AH, sav);
744 
745  if (ropt->useDB)
747 }
748 
749 /*
750  * Restore a single TOC item. Used in both parallel and non-parallel restore;
751  * is_parallel is true if we are in a worker child process.
752  *
753  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
754  * the parallel parent has to make the corresponding status update.
755  */
756 static int
757 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
758 {
759  RestoreOptions *ropt = AH->public.ropt;
760  int status = WORKER_OK;
761  int reqs;
762  bool defnDumped;
763 
764  AH->currentTE = te;
765 
766  /* Dump any relevant dump warnings to stderr */
767  if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
768  {
769  if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
770  pg_log_warning("warning from original dump file: %s", te->defn);
771  else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
772  pg_log_warning("warning from original dump file: %s", te->copyStmt);
773  }
774 
775  /* Work out what, if anything, we want from this entry */
776  reqs = te->reqs;
777 
778  defnDumped = false;
779 
780  /*
781  * If it has a schema component that we want, then process that
782  */
783  if ((reqs & REQ_SCHEMA) != 0)
784  {
785  /* Show namespace in log message if available */
786  if (te->namespace)
787  pg_log_info("creating %s \"%s.%s\"",
788  te->desc, te->namespace, te->tag);
789  else
790  pg_log_info("creating %s \"%s\"",
791  te->desc, te->tag);
792 
793  _printTocEntry(AH, te, false);
794  defnDumped = true;
795 
796  if (strcmp(te->desc, "TABLE") == 0)
797  {
798  if (AH->lastErrorTE == te)
799  {
800  /*
801  * We failed to create the table. If
802  * --no-data-for-failed-tables was given, mark the
803  * corresponding TABLE DATA to be ignored.
804  *
805  * In the parallel case this must be done in the parent, so we
806  * just set the return value.
807  */
808  if (ropt->noDataForFailedTables)
809  {
810  if (is_parallel)
812  else
814  }
815  }
816  else
817  {
818  /*
819  * We created the table successfully. Mark the corresponding
820  * TABLE DATA for possible truncation.
821  *
822  * In the parallel case this must be done in the parent, so we
823  * just set the return value.
824  */
825  if (is_parallel)
827  else
828  mark_create_done(AH, te);
829  }
830  }
831 
832  /*
833  * If we created a DB, connect to it. Also, if we changed DB
834  * properties, reconnect to ensure that relevant GUC settings are
835  * applied to our session.
836  */
837  if (strcmp(te->desc, "DATABASE") == 0 ||
838  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
839  {
840  pg_log_info("connecting to new database \"%s\"", te->tag);
841  _reconnectToDB(AH, te->tag);
842  }
843  }
844 
845  /*
846  * If it has a data component that we want, then process that
847  */
848  if ((reqs & REQ_DATA) != 0)
849  {
850  /*
851  * hadDumper will be set if there is genuine data component for this
852  * node. Otherwise, we need to check the defn field for statements
853  * that need to be executed in data-only restores.
854  */
855  if (te->hadDumper)
856  {
857  /*
858  * If we can output the data, then restore it.
859  */
860  if (AH->PrintTocDataPtr != NULL)
861  {
862  _printTocEntry(AH, te, true);
863 
864  if (strcmp(te->desc, "BLOBS") == 0 ||
865  strcmp(te->desc, "BLOB COMMENTS") == 0)
866  {
867  pg_log_info("processing %s", te->desc);
868 
869  _selectOutputSchema(AH, "pg_catalog");
870 
871  /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
872  if (strcmp(te->desc, "BLOB COMMENTS") == 0)
874 
875  AH->PrintTocDataPtr(AH, te);
876 
878  }
879  else
880  {
882 
883  /* Select owner and schema as necessary */
884  _becomeOwner(AH, te);
885  _selectOutputSchema(AH, te->namespace);
886 
887  pg_log_info("processing data for table \"%s.%s\"",
888  te->namespace, te->tag);
889 
890  /*
891  * In parallel restore, if we created the table earlier in
892  * the run then we wrap the COPY in a transaction and
893  * precede it with a TRUNCATE. If archiving is not on
894  * this prevents WAL-logging the COPY. This obtains a
895  * speedup similar to that from using single_txn mode in
896  * non-parallel restores.
897  */
898  if (is_parallel && te->created)
899  {
900  /*
901  * Parallel restore is always talking directly to a
902  * server, so no need to see if we should issue BEGIN.
903  */
904  StartTransaction(&AH->public);
905 
906  /*
907  * If the server version is >= 8.4, make sure we issue
908  * TRUNCATE with ONLY so that child tables are not
909  * wiped.
910  */
911  ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
912  (PQserverVersion(AH->connection) >= 80400 ?
913  "ONLY " : ""),
914  fmtQualifiedId(te->namespace, te->tag));
915  }
916 
917  /*
918  * If we have a copy statement, use it.
919  */
920  if (te->copyStmt && strlen(te->copyStmt) > 0)
921  {
922  ahprintf(AH, "%s", te->copyStmt);
924  }
925  else
927 
928  AH->PrintTocDataPtr(AH, te);
929 
930  /*
931  * Terminate COPY if needed.
932  */
933  if (AH->outputKind == OUTPUT_COPYDATA &&
934  RestoringToDB(AH))
935  EndDBCopyMode(&AH->public, te->tag);
937 
938  /* close out the transaction started above */
939  if (is_parallel && te->created)
941 
943  }
944  }
945  }
946  else if (!defnDumped)
947  {
948  /* If we haven't already dumped the defn part, do so now */
949  pg_log_info("executing %s %s", te->desc, te->tag);
950  _printTocEntry(AH, te, false);
951  }
952  }
953 
954  if (AH->public.n_errors > 0 && status == WORKER_OK)
956 
957  return status;
958 }
959 
960 /*
961  * Allocate a new RestoreOptions block.
962  * This is mainly so we can initialize it, but also for future expansion,
963  */
966 {
968 
970 
971  /* set any fields that shouldn't default to zeroes */
972  opts->format = archUnknown;
973  opts->cparams.promptPassword = TRI_DEFAULT;
974  opts->dumpSections = DUMP_UNSECTIONED;
975 
976  return opts;
977 }
978 
979 static void
981 {
982  RestoreOptions *ropt = AH->public.ropt;
983 
984  /* This hack is only needed in a data-only restore */
985  if (!ropt->dataOnly || !ropt->disable_triggers)
986  return;
987 
988  pg_log_info("disabling triggers for %s", te->tag);
989 
990  /*
991  * Become superuser if possible, since they are the only ones who can
992  * disable constraint triggers. If -S was not given, assume the initial
993  * user identity is a superuser. (XXX would it be better to become the
994  * table owner?)
995  */
996  _becomeUser(AH, ropt->superuser);
997 
998  /*
999  * Disable them.
1000  */
1001  ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1002  fmtQualifiedId(te->namespace, te->tag));
1003 }
1004 
1005 static void
1007 {
1008  RestoreOptions *ropt = AH->public.ropt;
1009 
1010  /* This hack is only needed in a data-only restore */
1011  if (!ropt->dataOnly || !ropt->disable_triggers)
1012  return;
1013 
1014  pg_log_info("enabling triggers for %s", te->tag);
1015 
1016  /*
1017  * Become superuser if possible, since they are the only ones who can
1018  * disable constraint triggers. If -S was not given, assume the initial
1019  * user identity is a superuser. (XXX would it be better to become the
1020  * table owner?)
1021  */
1022  _becomeUser(AH, ropt->superuser);
1023 
1024  /*
1025  * Enable them.
1026  */
1027  ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1028  fmtQualifiedId(te->namespace, te->tag));
1029 }
1030 
1031 /*
1032  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1033  */
1034 
1035 /* Public */
1036 void
1037 WriteData(Archive *AHX, const void *data, size_t dLen)
1038 {
1039  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1040 
1041  if (!AH->currToc)
1042  fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1043 
1044  AH->WriteDataPtr(AH, data, dLen);
1045 }
1046 
1047 /*
1048  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1049  * repository for all metadata. But the name has stuck.
1050  *
1051  * The new entry is added to the Archive's TOC list. Most callers can ignore
1052  * the result value because nothing else need be done, but a few want to
1053  * manipulate the TOC entry further.
1054  */
1055 
1056 /* Public */
1057 TocEntry *
1058 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1059  ArchiveOpts *opts)
1060 {
1061  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1062  TocEntry *newToc;
1063 
1064  newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1065 
1066  AH->tocCount++;
1067  if (dumpId > AH->maxDumpId)
1068  AH->maxDumpId = dumpId;
1069 
1070  newToc->prev = AH->toc->prev;
1071  newToc->next = AH->toc;
1072  AH->toc->prev->next = newToc;
1073  AH->toc->prev = newToc;
1074 
1075  newToc->catalogId = catalogId;
1076  newToc->dumpId = dumpId;
1077  newToc->section = opts->section;
1078 
1079  newToc->tag = pg_strdup(opts->tag);
1080  newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1081  newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1082  newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1083  newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1084  newToc->desc = pg_strdup(opts->description);
1085  newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1086  newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1087  newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1088 
1089  if (opts->nDeps > 0)
1090  {
1091  newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1092  memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1093  newToc->nDeps = opts->nDeps;
1094  }
1095  else
1096  {
1097  newToc->dependencies = NULL;
1098  newToc->nDeps = 0;
1099  }
1100 
1101  newToc->dataDumper = opts->dumpFn;
1102  newToc->dataDumperArg = opts->dumpArg;
1103  newToc->hadDumper = opts->dumpFn ? true : false;
1104 
1105  newToc->formatData = NULL;
1106  newToc->dataLength = 0;
1107 
1108  if (AH->ArchiveEntryPtr != NULL)
1109  AH->ArchiveEntryPtr(AH, newToc);
1110 
1111  return newToc;
1112 }
1113 
1114 /* Public */
1115 void
1117 {
1118  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1119  RestoreOptions *ropt = AH->public.ropt;
1120  TocEntry *te;
1121  teSection curSection;
1122  OutputContext sav;
1123  const char *fmtName;
1124  char stamp_str[64];
1125 
1126  sav = SaveOutput(AH);
1127  if (ropt->filename)
1128  SetOutput(AH, ropt->filename, 0 /* no compression */ );
1129 
1130  if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1131  localtime(&AH->createDate)) == 0)
1132  strcpy(stamp_str, "[unknown]");
1133 
1134  ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1135  ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
1136  sanitize_line(AH->archdbname, false),
1137  AH->tocCount, AH->compression);
1138 
1139  switch (AH->format)
1140  {
1141  case archCustom:
1142  fmtName = "CUSTOM";
1143  break;
1144  case archDirectory:
1145  fmtName = "DIRECTORY";
1146  break;
1147  case archTar:
1148  fmtName = "TAR";
1149  break;
1150  default:
1151  fmtName = "UNKNOWN";
1152  }
1153 
1154  ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1156  ahprintf(AH, "; Format: %s\n", fmtName);
1157  ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1158  ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1159  if (AH->archiveRemoteVersion)
1160  ahprintf(AH, "; Dumped from database version: %s\n",
1161  AH->archiveRemoteVersion);
1162  if (AH->archiveDumpVersion)
1163  ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1164  AH->archiveDumpVersion);
1165 
1166  ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1167 
1168  curSection = SECTION_PRE_DATA;
1169  for (te = AH->toc->next; te != AH->toc; te = te->next)
1170  {
1171  if (te->section != SECTION_NONE)
1172  curSection = te->section;
1173  if (ropt->verbose ||
1174  (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1175  {
1176  char *sanitized_name;
1177  char *sanitized_schema;
1178  char *sanitized_owner;
1179 
1180  /*
1181  */
1182  sanitized_name = sanitize_line(te->tag, false);
1183  sanitized_schema = sanitize_line(te->namespace, true);
1184  sanitized_owner = sanitize_line(te->owner, false);
1185 
1186  ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1187  te->catalogId.tableoid, te->catalogId.oid,
1188  te->desc, sanitized_schema, sanitized_name,
1189  sanitized_owner);
1190 
1191  free(sanitized_name);
1192  free(sanitized_schema);
1193  free(sanitized_owner);
1194  }
1195  if (ropt->verbose && te->nDeps > 0)
1196  {
1197  int i;
1198 
1199  ahprintf(AH, ";\tdepends on:");
1200  for (i = 0; i < te->nDeps; i++)
1201  ahprintf(AH, " %d", te->dependencies[i]);
1202  ahprintf(AH, "\n");
1203  }
1204  }
1205 
1206  /* Enforce strict names checking */
1207  if (ropt->strict_names)
1208  StrictNamesCheck(ropt);
1209 
1210  if (ropt->filename)
1211  RestoreOutput(AH, sav);
1212 }
1213 
1214 /***********
1215  * BLOB Archival
1216  ***********/
1217 
1218 /* Called by a dumper to signal start of a BLOB */
1219 int
1221 {
1222  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1223 
1224  if (!AH->StartBlobPtr)
1225  fatal("large-object output not supported in chosen format");
1226 
1227  AH->StartBlobPtr(AH, AH->currToc, oid);
1228 
1229  return 1;
1230 }
1231 
1232 /* Called by a dumper to signal end of a BLOB */
1233 int
1234 EndBlob(Archive *AHX, Oid oid)
1235 {
1236  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1237 
1238  if (AH->EndBlobPtr)
1239  AH->EndBlobPtr(AH, AH->currToc, oid);
1240 
1241  return 1;
1242 }
1243 
1244 /**********
1245  * BLOB Restoration
1246  **********/
1247 
1248 /*
1249  * Called by a format handler before any blobs are restored
1250  */
1251 void
1253 {
1254  RestoreOptions *ropt = AH->public.ropt;
1255 
1256  if (!ropt->single_txn)
1257  {
1258  if (AH->connection)
1259  StartTransaction(&AH->public);
1260  else
1261  ahprintf(AH, "BEGIN;\n\n");
1262  }
1263 
1264  AH->blobCount = 0;
1265 }
1266 
1267 /*
1268  * Called by a format handler after all blobs are restored
1269  */
1270 void
1272 {
1273  RestoreOptions *ropt = AH->public.ropt;
1274 
1275  if (!ropt->single_txn)
1276  {
1277  if (AH->connection)
1278  CommitTransaction(&AH->public);
1279  else
1280  ahprintf(AH, "COMMIT;\n\n");
1281  }
1282 
1283  pg_log_info(ngettext("restored %d large object",
1284  "restored %d large objects",
1285  AH->blobCount),
1286  AH->blobCount);
1287 }
1288 
1289 
1290 /*
1291  * Called by a format handler to initiate restoration of a blob
1292  */
1293 void
1294 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
1295 {
1296  bool old_blob_style = (AH->version < K_VERS_1_12);
1297  Oid loOid;
1298 
1299  AH->blobCount++;
1300 
1301  /* Initialize the LO Buffer */
1302  AH->lo_buf_used = 0;
1303 
1304  pg_log_info("restoring large object with OID %u", oid);
1305 
1306  /* With an old archive we must do drop and create logic here */
1307  if (old_blob_style && drop)
1308  DropBlobIfExists(AH, oid);
1309 
1310  if (AH->connection)
1311  {
1312  if (old_blob_style)
1313  {
1314  loOid = lo_create(AH->connection, oid);
1315  if (loOid == 0 || loOid != oid)
1316  fatal("could not create large object %u: %s",
1317  oid, PQerrorMessage(AH->connection));
1318  }
1319  AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1320  if (AH->loFd == -1)
1321  fatal("could not open large object %u: %s",
1322  oid, PQerrorMessage(AH->connection));
1323  }
1324  else
1325  {
1326  if (old_blob_style)
1327  ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1328  oid, INV_WRITE);
1329  else
1330  ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1331  oid, INV_WRITE);
1332  }
1333 
1334  AH->writingBlob = 1;
1335 }
1336 
1337 void
1339 {
1340  if (AH->lo_buf_used > 0)
1341  {
1342  /* Write remaining bytes from the LO buffer */
1343  dump_lo_buf(AH);
1344  }
1345 
1346  AH->writingBlob = 0;
1347 
1348  if (AH->connection)
1349  {
1350  lo_close(AH->connection, AH->loFd);
1351  AH->loFd = -1;
1352  }
1353  else
1354  {
1355  ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1356  }
1357 }
1358 
1359 /***********
1360  * Sorting and Reordering
1361  ***********/
1362 
1363 void
1365 {
1366  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1367  RestoreOptions *ropt = AH->public.ropt;
1368  FILE *fh;
1369  StringInfoData linebuf;
1370 
1371  /* Allocate space for the 'wanted' array, and init it */
1372  ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1373 
1374  /* Setup the file */
1375  fh = fopen(ropt->tocFile, PG_BINARY_R);
1376  if (!fh)
1377  fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1378 
1379  initStringInfo(&linebuf);
1380 
1381  while (pg_get_line_buf(fh, &linebuf))
1382  {
1383  char *cmnt;
1384  char *endptr;
1385  DumpId id;
1386  TocEntry *te;
1387 
1388  /* Truncate line at comment, if any */
1389  cmnt = strchr(linebuf.data, ';');
1390  if (cmnt != NULL)
1391  {
1392  cmnt[0] = '\0';
1393  linebuf.len = cmnt - linebuf.data;
1394  }
1395 
1396  /* Ignore if all blank */
1397  if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1398  continue;
1399 
1400  /* Get an ID, check it's valid and not already seen */
1401  id = strtol(linebuf.data, &endptr, 10);
1402  if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1403  ropt->idWanted[id - 1])
1404  {
1405  pg_log_warning("line ignored: %s", linebuf.data);
1406  continue;
1407  }
1408 
1409  /* Find TOC entry */
1410  te = getTocEntryByDumpId(AH, id);
1411  if (!te)
1412  fatal("could not find entry for ID %d",
1413  id);
1414 
1415  /* Mark it wanted */
1416  ropt->idWanted[id - 1] = true;
1417 
1418  /*
1419  * Move each item to the end of the list as it is selected, so that
1420  * they are placed in the desired order. Any unwanted items will end
1421  * up at the front of the list, which may seem unintuitive but it's
1422  * what we need. In an ordinary serial restore that makes no
1423  * difference, but in a parallel restore we need to mark unrestored
1424  * items' dependencies as satisfied before we start examining
1425  * restorable items. Otherwise they could have surprising
1426  * side-effects on the order in which restorable items actually get
1427  * restored.
1428  */
1429  _moveBefore(AH->toc, te);
1430  }
1431 
1432  pg_free(linebuf.data);
1433 
1434  if (fclose(fh) != 0)
1435  fatal("could not close TOC file: %m");
1436 }
1437 
1438 /**********************
1439  * Convenience functions that look like standard IO functions
1440  * for writing data when in dump mode.
1441  **********************/
1442 
1443 /* Public */
1444 void
1445 archputs(const char *s, Archive *AH)
1446 {
1447  WriteData(AH, s, strlen(s));
1448 }
1449 
1450 /* Public */
1451 int
1452 archprintf(Archive *AH, const char *fmt,...)
1453 {
1454  int save_errno = errno;
1455  char *p;
1456  size_t len = 128; /* initial assumption about buffer size */
1457  size_t cnt;
1458 
1459  for (;;)
1460  {
1461  va_list args;
1462 
1463  /* Allocate work buffer. */
1464  p = (char *) pg_malloc(len);
1465 
1466  /* Try to format the data. */
1467  errno = save_errno;
1468  va_start(args, fmt);
1469  cnt = pvsnprintf(p, len, fmt, args);
1470  va_end(args);
1471 
1472  if (cnt < len)
1473  break; /* success */
1474 
1475  /* Release buffer and loop around to try again with larger len. */
1476  free(p);
1477  len = cnt;
1478  }
1479 
1480  WriteData(AH, p, cnt);
1481  free(p);
1482  return (int) cnt;
1483 }
1484 
1485 
1486 /*******************************
1487  * Stuff below here should be 'private' to the archiver routines
1488  *******************************/
1489 
1490 static void
1491 SetOutput(ArchiveHandle *AH, const char *filename, int compression)
1492 {
1493  int fn;
1494 
1495  if (filename)
1496  {
1497  if (strcmp(filename, "-") == 0)
1498  fn = fileno(stdout);
1499  else
1500  fn = -1;
1501  }
1502  else if (AH->FH)
1503  fn = fileno(AH->FH);
1504  else if (AH->fSpec)
1505  {
1506  fn = -1;
1507  filename = AH->fSpec;
1508  }
1509  else
1510  fn = fileno(stdout);
1511 
1512  /* If compression explicitly requested, use gzopen */
1513 #ifdef HAVE_LIBZ
1514  if (compression != 0)
1515  {
1516  char fmode[14];
1517 
1518  /* Don't use PG_BINARY_x since this is zlib */
1519  sprintf(fmode, "wb%d", compression);
1520  if (fn >= 0)
1521  AH->OF = gzdopen(dup(fn), fmode);
1522  else
1523  AH->OF = gzopen(filename, fmode);
1524  AH->gzOut = 1;
1525  }
1526  else
1527 #endif
1528  { /* Use fopen */
1529  if (AH->mode == archModeAppend)
1530  {
1531  if (fn >= 0)
1532  AH->OF = fdopen(dup(fn), PG_BINARY_A);
1533  else
1534  AH->OF = fopen(filename, PG_BINARY_A);
1535  }
1536  else
1537  {
1538  if (fn >= 0)
1539  AH->OF = fdopen(dup(fn), PG_BINARY_W);
1540  else
1541  AH->OF = fopen(filename, PG_BINARY_W);
1542  }
1543  AH->gzOut = 0;
1544  }
1545 
1546  if (!AH->OF)
1547  {
1548  if (filename)
1549  fatal("could not open output file \"%s\": %m", filename);
1550  else
1551  fatal("could not open output file: %m");
1552  }
1553 }
1554 
1555 static OutputContext
1557 {
1558  OutputContext sav;
1559 
1560  sav.OF = AH->OF;
1561  sav.gzOut = AH->gzOut;
1562 
1563  return sav;
1564 }
1565 
1566 static void
1568 {
1569  int res;
1570 
1571  errno = 0; /* in case gzclose() doesn't set it */
1572  if (AH->gzOut)
1573  res = GZCLOSE(AH->OF);
1574  else
1575  res = fclose(AH->OF);
1576 
1577  if (res != 0)
1578  fatal("could not close output file: %m");
1579 
1580  AH->gzOut = savedContext.gzOut;
1581  AH->OF = savedContext.OF;
1582 }
1583 
1584 
1585 
1586 /*
1587  * Print formatted text to the output file (usually stdout).
1588  */
1589 int
1590 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1591 {
1592  int save_errno = errno;
1593  char *p;
1594  size_t len = 128; /* initial assumption about buffer size */
1595  size_t cnt;
1596 
1597  for (;;)
1598  {
1599  va_list args;
1600 
1601  /* Allocate work buffer. */
1602  p = (char *) pg_malloc(len);
1603 
1604  /* Try to format the data. */
1605  errno = save_errno;
1606  va_start(args, fmt);
1607  cnt = pvsnprintf(p, len, fmt, args);
1608  va_end(args);
1609 
1610  if (cnt < len)
1611  break; /* success */
1612 
1613  /* Release buffer and loop around to try again with larger len. */
1614  free(p);
1615  len = cnt;
1616  }
1617 
1618  ahwrite(p, 1, cnt, AH);
1619  free(p);
1620  return (int) cnt;
1621 }
1622 
1623 /*
1624  * Single place for logic which says 'We are restoring to a direct DB connection'.
1625  */
1626 static int
1628 {
1629  RestoreOptions *ropt = AH->public.ropt;
1630 
1631  return (ropt && ropt->useDB && AH->connection);
1632 }
1633 
1634 /*
1635  * Dump the current contents of the LO data buffer while writing a BLOB
1636  */
1637 static void
1639 {
1640  if (AH->connection)
1641  {
1642  int res;
1643 
1644  res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1645  pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1646  "wrote %zu bytes of large object data (result = %d)",
1647  AH->lo_buf_used),
1648  AH->lo_buf_used, res);
1649  /* We assume there are no short writes, only errors */
1650  if (res != AH->lo_buf_used)
1651  warn_or_exit_horribly(AH, "could not write to large object: %s",
1652  PQerrorMessage(AH->connection));
1653  }
1654  else
1655  {
1657 
1659  (const unsigned char *) AH->lo_buf,
1660  AH->lo_buf_used,
1661  AH);
1662 
1663  /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
1664  AH->writingBlob = 0;
1665  ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1666  AH->writingBlob = 1;
1667 
1669  }
1670  AH->lo_buf_used = 0;
1671 }
1672 
1673 
1674 /*
1675  * Write buffer to the output file (usually stdout). This is used for
1676  * outputting 'restore' scripts etc. It is even possible for an archive
1677  * format to create a custom output routine to 'fake' a restore if it
1678  * wants to generate a script (see TAR output).
1679  */
1680 void
1681 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1682 {
1683  int bytes_written = 0;
1684 
1685  if (AH->writingBlob)
1686  {
1687  size_t remaining = size * nmemb;
1688 
1689  while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1690  {
1691  size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1692 
1693  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1694  ptr = (const void *) ((const char *) ptr + avail);
1695  remaining -= avail;
1696  AH->lo_buf_used += avail;
1697  dump_lo_buf(AH);
1698  }
1699 
1700  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1701  AH->lo_buf_used += remaining;
1702 
1703  bytes_written = size * nmemb;
1704  }
1705  else if (AH->gzOut)
1706  bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
1707  else if (AH->CustomOutPtr)
1708  bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1709 
1710  else
1711  {
1712  /*
1713  * If we're doing a restore, and it's direct to DB, and we're
1714  * connected then send it to the DB.
1715  */
1716  if (RestoringToDB(AH))
1717  bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1718  else
1719  bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
1720  }
1721 
1722  if (bytes_written != size * nmemb)
1724 }
1725 
1726 /* on some error, we may decide to go on... */
1727 void
1729 {
1730  va_list ap;
1731 
1732  switch (AH->stage)
1733  {
1734 
1735  case STAGE_NONE:
1736  /* Do nothing special */
1737  break;
1738 
1739  case STAGE_INITIALIZING:
1740  if (AH->stage != AH->lastErrorStage)
1741  pg_log_generic(PG_LOG_INFO, "while INITIALIZING:");
1742  break;
1743 
1744  case STAGE_PROCESSING:
1745  if (AH->stage != AH->lastErrorStage)
1746  pg_log_generic(PG_LOG_INFO, "while PROCESSING TOC:");
1747  break;
1748 
1749  case STAGE_FINALIZING:
1750  if (AH->stage != AH->lastErrorStage)
1751  pg_log_generic(PG_LOG_INFO, "while FINALIZING:");
1752  break;
1753  }
1754  if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1755  {
1756  pg_log_generic(PG_LOG_INFO, "from TOC entry %d; %u %u %s %s %s",
1757  AH->currentTE->dumpId,
1759  AH->currentTE->catalogId.oid,
1760  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1761  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1762  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1763  }
1764  AH->lastErrorStage = AH->stage;
1765  AH->lastErrorTE = AH->currentTE;
1766 
1767  va_start(ap, fmt);
1769  va_end(ap);
1770 
1771  if (AH->public.exit_on_error)
1772  exit_nicely(1);
1773  else
1774  AH->public.n_errors++;
1775 }
1776 
1777 #ifdef NOT_USED
1778 
1779 static void
1780 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1781 {
1782  /* Unlink te from list */
1783  te->prev->next = te->next;
1784  te->next->prev = te->prev;
1785 
1786  /* and insert it after "pos" */
1787  te->prev = pos;
1788  te->next = pos->next;
1789  pos->next->prev = te;
1790  pos->next = te;
1791 }
1792 #endif
1793 
1794 static void
1796 {
1797  /* Unlink te from list */
1798  te->prev->next = te->next;
1799  te->next->prev = te->prev;
1800 
1801  /* and insert it before "pos" */
1802  te->prev = pos->prev;
1803  te->next = pos;
1804  pos->prev->next = te;
1805  pos->prev = te;
1806 }
1807 
1808 /*
1809  * Build index arrays for the TOC list
1810  *
1811  * This should be invoked only after we have created or read in all the TOC
1812  * items.
1813  *
1814  * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1815  * array entries run only up to maxDumpId. We might see dependency dump IDs
1816  * beyond that (if the dump was partial); so always check the array bound
1817  * before trying to touch an array entry.
1818  */
1819 static void
1821 {
1822  DumpId maxDumpId = AH->maxDumpId;
1823  TocEntry *te;
1824 
1825  AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1826  AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1827 
1828  for (te = AH->toc->next; te != AH->toc; te = te->next)
1829  {
1830  /* this check is purely paranoia, maxDumpId should be correct */
1831  if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1832  fatal("bad dumpId");
1833 
1834  /* tocsByDumpId indexes all TOCs by their dump ID */
1835  AH->tocsByDumpId[te->dumpId] = te;
1836 
1837  /*
1838  * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1839  * TOC entry that has a DATA item. We compute this by reversing the
1840  * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1841  * just one dependency and it is the TABLE item.
1842  */
1843  if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1844  {
1845  DumpId tableId = te->dependencies[0];
1846 
1847  /*
1848  * The TABLE item might not have been in the archive, if this was
1849  * a data-only dump; but its dump ID should be less than its data
1850  * item's dump ID, so there should be a place for it in the array.
1851  */
1852  if (tableId <= 0 || tableId > maxDumpId)
1853  fatal("bad table dumpId for TABLE DATA item");
1854 
1855  AH->tableDataId[tableId] = te->dumpId;
1856  }
1857  }
1858 }
1859 
1860 TocEntry *
1862 {
1863  /* build index arrays if we didn't already */
1864  if (AH->tocsByDumpId == NULL)
1865  buildTocEntryArrays(AH);
1866 
1867  if (id > 0 && id <= AH->maxDumpId)
1868  return AH->tocsByDumpId[id];
1869 
1870  return NULL;
1871 }
1872 
1873 int
1875 {
1876  TocEntry *te = getTocEntryByDumpId(AH, id);
1877 
1878  if (!te)
1879  return 0;
1880 
1881  return te->reqs;
1882 }
1883 
1884 size_t
1885 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1886 {
1887  int off;
1888 
1889  /* Save the flag */
1890  AH->WriteBytePtr(AH, wasSet);
1891 
1892  /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1893  for (off = 0; off < sizeof(pgoff_t); off++)
1894  {
1895  AH->WriteBytePtr(AH, o & 0xFF);
1896  o >>= 8;
1897  }
1898  return sizeof(pgoff_t) + 1;
1899 }
1900 
1901 int
1903 {
1904  int i;
1905  int off;
1906  int offsetFlg;
1907 
1908  /* Initialize to zero */
1909  *o = 0;
1910 
1911  /* Check for old version */
1912  if (AH->version < K_VERS_1_7)
1913  {
1914  /* Prior versions wrote offsets using WriteInt */
1915  i = ReadInt(AH);
1916  /* -1 means not set */
1917  if (i < 0)
1918  return K_OFFSET_POS_NOT_SET;
1919  else if (i == 0)
1920  return K_OFFSET_NO_DATA;
1921 
1922  /* Cast to pgoff_t because it was written as an int. */
1923  *o = (pgoff_t) i;
1924  return K_OFFSET_POS_SET;
1925  }
1926 
1927  /*
1928  * Read the flag indicating the state of the data pointer. Check if valid
1929  * and die if not.
1930  *
1931  * This used to be handled by a negative or zero pointer, now we use an
1932  * extra byte specifically for the state.
1933  */
1934  offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1935 
1936  switch (offsetFlg)
1937  {
1938  case K_OFFSET_POS_NOT_SET:
1939  case K_OFFSET_NO_DATA:
1940  case K_OFFSET_POS_SET:
1941 
1942  break;
1943 
1944  default:
1945  fatal("unexpected data offset flag %d", offsetFlg);
1946  }
1947 
1948  /*
1949  * Read the bytes
1950  */
1951  for (off = 0; off < AH->offSize; off++)
1952  {
1953  if (off < sizeof(pgoff_t))
1954  *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1955  else
1956  {
1957  if (AH->ReadBytePtr(AH) != 0)
1958  fatal("file offset in dump file is too large");
1959  }
1960  }
1961 
1962  return offsetFlg;
1963 }
1964 
1965 size_t
1967 {
1968  int b;
1969 
1970  /*
1971  * This is a bit yucky, but I don't want to make the binary format very
1972  * dependent on representation, and not knowing much about it, I write out
1973  * a sign byte. If you change this, don't forget to change the file
1974  * version #, and modify ReadInt to read the new format AS WELL AS the old
1975  * formats.
1976  */
1977 
1978  /* SIGN byte */
1979  if (i < 0)
1980  {
1981  AH->WriteBytePtr(AH, 1);
1982  i = -i;
1983  }
1984  else
1985  AH->WriteBytePtr(AH, 0);
1986 
1987  for (b = 0; b < AH->intSize; b++)
1988  {
1989  AH->WriteBytePtr(AH, i & 0xFF);
1990  i >>= 8;
1991  }
1992 
1993  return AH->intSize + 1;
1994 }
1995 
1996 int
1998 {
1999  int res = 0;
2000  int bv,
2001  b;
2002  int sign = 0; /* Default positive */
2003  int bitShift = 0;
2004 
2005  if (AH->version > K_VERS_1_0)
2006  /* Read a sign byte */
2007  sign = AH->ReadBytePtr(AH);
2008 
2009  for (b = 0; b < AH->intSize; b++)
2010  {
2011  bv = AH->ReadBytePtr(AH) & 0xFF;
2012  if (bv != 0)
2013  res = res + (bv << bitShift);
2014  bitShift += 8;
2015  }
2016 
2017  if (sign)
2018  res = -res;
2019 
2020  return res;
2021 }
2022 
2023 size_t
2024 WriteStr(ArchiveHandle *AH, const char *c)
2025 {
2026  size_t res;
2027 
2028  if (c)
2029  {
2030  int len = strlen(c);
2031 
2032  res = WriteInt(AH, len);
2033  AH->WriteBufPtr(AH, c, len);
2034  res += len;
2035  }
2036  else
2037  res = WriteInt(AH, -1);
2038 
2039  return res;
2040 }
2041 
2042 char *
2044 {
2045  char *buf;
2046  int l;
2047 
2048  l = ReadInt(AH);
2049  if (l < 0)
2050  buf = NULL;
2051  else
2052  {
2053  buf = (char *) pg_malloc(l + 1);
2054  AH->ReadBufPtr(AH, (void *) buf, l);
2055 
2056  buf[l] = '\0';
2057  }
2058 
2059  return buf;
2060 }
2061 
2062 static int
2064 {
2065  FILE *fh;
2066  char sig[6]; /* More than enough */
2067  size_t cnt;
2068  int wantClose = 0;
2069 
2070  pg_log_debug("attempting to ascertain archive format");
2071 
2072  if (AH->lookahead)
2073  free(AH->lookahead);
2074 
2075  AH->readHeader = 0;
2076  AH->lookaheadSize = 512;
2077  AH->lookahead = pg_malloc0(512);
2078  AH->lookaheadLen = 0;
2079  AH->lookaheadPos = 0;
2080 
2081  if (AH->fSpec)
2082  {
2083  struct stat st;
2084 
2085  wantClose = 1;
2086 
2087  /*
2088  * Check if the specified archive is a directory. If so, check if
2089  * there's a "toc.dat" (or "toc.dat.gz") file in it.
2090  */
2091  if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2092  {
2093  char buf[MAXPGPATH];
2094 
2095  if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
2096  fatal("directory name too long: \"%s\"",
2097  AH->fSpec);
2098  if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2099  {
2100  AH->format = archDirectory;
2101  return AH->format;
2102  }
2103 
2104 #ifdef HAVE_LIBZ
2105  if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
2106  fatal("directory name too long: \"%s\"",
2107  AH->fSpec);
2108  if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
2109  {
2110  AH->format = archDirectory;
2111  return AH->format;
2112  }
2113 #endif
2114  fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2115  AH->fSpec);
2116  fh = NULL; /* keep compiler quiet */
2117  }
2118  else
2119  {
2120  fh = fopen(AH->fSpec, PG_BINARY_R);
2121  if (!fh)
2122  fatal("could not open input file \"%s\": %m", AH->fSpec);
2123  }
2124  }
2125  else
2126  {
2127  fh = stdin;
2128  if (!fh)
2129  fatal("could not open input file: %m");
2130  }
2131 
2132  if ((cnt = fread(sig, 1, 5, fh)) != 5)
2133  {
2134  if (ferror(fh))
2135  fatal("could not read input file: %m");
2136  else
2137  fatal("input file is too short (read %lu, expected 5)",
2138  (unsigned long) cnt);
2139  }
2140 
2141  /* Save it, just in case we need it later */
2142  memcpy(&AH->lookahead[0], sig, 5);
2143  AH->lookaheadLen = 5;
2144 
2145  if (strncmp(sig, "PGDMP", 5) == 0)
2146  {
2147  /* It's custom format, stop here */
2148  AH->format = archCustom;
2149  AH->readHeader = 1;
2150  }
2151  else
2152  {
2153  /*
2154  * *Maybe* we have a tar archive format file or a text dump ... So,
2155  * read first 512 byte header...
2156  */
2157  cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2158  /* read failure is checked below */
2159  AH->lookaheadLen += cnt;
2160 
2161  if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2162  (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2163  strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2164  {
2165  /*
2166  * looks like it's probably a text format dump. so suggest they
2167  * try psql
2168  */
2169  fatal("input file appears to be a text format dump. Please use psql.");
2170  }
2171 
2172  if (AH->lookaheadLen != 512)
2173  {
2174  if (feof(fh))
2175  fatal("input file does not appear to be a valid archive (too short?)");
2176  else
2177  READ_ERROR_EXIT(fh);
2178  }
2179 
2180  if (!isValidTarHeader(AH->lookahead))
2181  fatal("input file does not appear to be a valid archive");
2182 
2183  AH->format = archTar;
2184  }
2185 
2186  /* Close the file if we opened it */
2187  if (wantClose)
2188  {
2189  if (fclose(fh) != 0)
2190  fatal("could not close input file: %m");
2191  /* Forget lookahead, since we'll re-read header after re-opening */
2192  AH->readHeader = 0;
2193  AH->lookaheadLen = 0;
2194  }
2195 
2196  return AH->format;
2197 }
2198 
2199 
2200 /*
2201  * Allocate an archive handle
2202  */
2203 static ArchiveHandle *
2204 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2205  const int compression, bool dosync, ArchiveMode mode,
2206  SetupWorkerPtrType setupWorkerPtr)
2207 {
2208  ArchiveHandle *AH;
2209 
2210  pg_log_debug("allocating AH for %s, format %d",
2211  FileSpec ? FileSpec : "(stdio)", fmt);
2212 
2213  AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2214 
2215  AH->version = K_VERS_SELF;
2216 
2217  /* initialize for backwards compatible string processing */
2218  AH->public.encoding = 0; /* PG_SQL_ASCII */
2219  AH->public.std_strings = false;
2220 
2221  /* sql error handling */
2222  AH->public.exit_on_error = true;
2223  AH->public.n_errors = 0;
2224 
2225  AH->archiveDumpVersion = PG_VERSION;
2226 
2227  AH->createDate = time(NULL);
2228 
2229  AH->intSize = sizeof(int);
2230  AH->offSize = sizeof(pgoff_t);
2231  if (FileSpec)
2232  {
2233  AH->fSpec = pg_strdup(FileSpec);
2234 
2235  /*
2236  * Not used; maybe later....
2237  *
2238  * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2239  * i--) if (AH->workDir[i-1] == '/')
2240  */
2241  }
2242  else
2243  AH->fSpec = NULL;
2244 
2245  AH->currUser = NULL; /* unknown */
2246  AH->currSchema = NULL; /* ditto */
2247  AH->currTablespace = NULL; /* ditto */
2248  AH->currTableAm = NULL; /* ditto */
2249 
2250  AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2251 
2252  AH->toc->next = AH->toc;
2253  AH->toc->prev = AH->toc;
2254 
2255  AH->mode = mode;
2256  AH->compression = compression;
2257  AH->dosync = dosync;
2258 
2259  memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2260 
2261  /* Open stdout with no compression for AH output handle */
2262  AH->gzOut = 0;
2263  AH->OF = stdout;
2264 
2265  /*
2266  * On Windows, we need to use binary mode to read/write non-text files,
2267  * which include all archive formats as well as compressed plain text.
2268  * Force stdin/stdout into binary mode if that is what we are using.
2269  */
2270 #ifdef WIN32
2271  if ((fmt != archNull || compression != 0) &&
2272  (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2273  {
2274  if (mode == archModeWrite)
2275  _setmode(fileno(stdout), O_BINARY);
2276  else
2277  _setmode(fileno(stdin), O_BINARY);
2278  }
2279 #endif
2280 
2281  AH->SetupWorkerPtr = setupWorkerPtr;
2282 
2283  if (fmt == archUnknown)
2284  AH->format = _discoverArchiveFormat(AH);
2285  else
2286  AH->format = fmt;
2287 
2288  switch (AH->format)
2289  {
2290  case archCustom:
2292  break;
2293 
2294  case archNull:
2295  InitArchiveFmt_Null(AH);
2296  break;
2297 
2298  case archDirectory:
2300  break;
2301 
2302  case archTar:
2303  InitArchiveFmt_Tar(AH);
2304  break;
2305 
2306  default:
2307  fatal("unrecognized file format \"%d\"", fmt);
2308  }
2309 
2310  return AH;
2311 }
2312 
2313 /*
2314  * Write out all data (tables & blobs)
2315  */
2316 void
2318 {
2319  TocEntry *te;
2320 
2321  if (pstate && pstate->numWorkers > 1)
2322  {
2323  /*
2324  * In parallel mode, this code runs in the leader process. We
2325  * construct an array of candidate TEs, then sort it into decreasing
2326  * size order, then dispatch each TE to a data-transfer worker. By
2327  * dumping larger tables first, we avoid getting into a situation
2328  * where we're down to one job and it's big, losing parallelism.
2329  */
2330  TocEntry **tes;
2331  int ntes;
2332 
2333  tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2334  ntes = 0;
2335  for (te = AH->toc->next; te != AH->toc; te = te->next)
2336  {
2337  /* Consider only TEs with dataDumper functions ... */
2338  if (!te->dataDumper)
2339  continue;
2340  /* ... and ignore ones not enabled for dump */
2341  if ((te->reqs & REQ_DATA) == 0)
2342  continue;
2343 
2344  tes[ntes++] = te;
2345  }
2346 
2347  if (ntes > 1)
2348  qsort((void *) tes, ntes, sizeof(TocEntry *),
2350 
2351  for (int i = 0; i < ntes; i++)
2352  DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2353  mark_dump_job_done, NULL);
2354 
2355  pg_free(tes);
2356 
2357  /* Now wait for workers to finish. */
2358  WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2359  }
2360  else
2361  {
2362  /* Non-parallel mode: just dump all candidate TEs sequentially. */
2363  for (te = AH->toc->next; te != AH->toc; te = te->next)
2364  {
2365  /* Must have same filter conditions as above */
2366  if (!te->dataDumper)
2367  continue;
2368  if ((te->reqs & REQ_DATA) == 0)
2369  continue;
2370 
2372  }
2373  }
2374 }
2375 
2376 
2377 /*
2378  * Callback function that's invoked in the leader process after a step has
2379  * been parallel dumped.
2380  *
2381  * We don't need to do anything except check for worker failure.
2382  */
2383 static void
2385  TocEntry *te,
2386  int status,
2387  void *callback_data)
2388 {
2389  pg_log_info("finished item %d %s %s",
2390  te->dumpId, te->desc, te->tag);
2391 
2392  if (status != 0)
2393  fatal("worker process failed: exit code %d",
2394  status);
2395 }
2396 
2397 
2398 void
2400 {
2401  StartDataPtrType startPtr;
2402  EndDataPtrType endPtr;
2403 
2404  AH->currToc = te;
2405 
2406  if (strcmp(te->desc, "BLOBS") == 0)
2407  {
2408  startPtr = AH->StartBlobsPtr;
2409  endPtr = AH->EndBlobsPtr;
2410  }
2411  else
2412  {
2413  startPtr = AH->StartDataPtr;
2414  endPtr = AH->EndDataPtr;
2415  }
2416 
2417  if (startPtr != NULL)
2418  (*startPtr) (AH, te);
2419 
2420  /*
2421  * The user-provided DataDumper routine needs to call AH->WriteData
2422  */
2423  te->dataDumper((Archive *) AH, te->dataDumperArg);
2424 
2425  if (endPtr != NULL)
2426  (*endPtr) (AH, te);
2427 
2428  AH->currToc = NULL;
2429 }
2430 
2431 void
2433 {
2434  TocEntry *te;
2435  char workbuf[32];
2436  int tocCount;
2437  int i;
2438 
2439  /* count entries that will actually be dumped */
2440  tocCount = 0;
2441  for (te = AH->toc->next; te != AH->toc; te = te->next)
2442  {
2443  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2444  tocCount++;
2445  }
2446 
2447  /* printf("%d TOC Entries to save\n", tocCount); */
2448 
2449  WriteInt(AH, tocCount);
2450 
2451  for (te = AH->toc->next; te != AH->toc; te = te->next)
2452  {
2453  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2454  continue;
2455 
2456  WriteInt(AH, te->dumpId);
2457  WriteInt(AH, te->dataDumper ? 1 : 0);
2458 
2459  /* OID is recorded as a string for historical reasons */
2460  sprintf(workbuf, "%u", te->catalogId.tableoid);
2461  WriteStr(AH, workbuf);
2462  sprintf(workbuf, "%u", te->catalogId.oid);
2463  WriteStr(AH, workbuf);
2464 
2465  WriteStr(AH, te->tag);
2466  WriteStr(AH, te->desc);
2467  WriteInt(AH, te->section);
2468  WriteStr(AH, te->defn);
2469  WriteStr(AH, te->dropStmt);
2470  WriteStr(AH, te->copyStmt);
2471  WriteStr(AH, te->namespace);
2472  WriteStr(AH, te->tablespace);
2473  WriteStr(AH, te->tableam);
2474  WriteStr(AH, te->owner);
2475  WriteStr(AH, "false");
2476 
2477  /* Dump list of dependencies */
2478  for (i = 0; i < te->nDeps; i++)
2479  {
2480  sprintf(workbuf, "%d", te->dependencies[i]);
2481  WriteStr(AH, workbuf);
2482  }
2483  WriteStr(AH, NULL); /* Terminate List */
2484 
2485  if (AH->WriteExtraTocPtr)
2486  AH->WriteExtraTocPtr(AH, te);
2487  }
2488 }
2489 
2490 void
2492 {
2493  int i;
2494  char *tmp;
2495  DumpId *deps;
2496  int depIdx;
2497  int depSize;
2498  TocEntry *te;
2499 
2500  AH->tocCount = ReadInt(AH);
2501  AH->maxDumpId = 0;
2502 
2503  for (i = 0; i < AH->tocCount; i++)
2504  {
2505  te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2506  te->dumpId = ReadInt(AH);
2507 
2508  if (te->dumpId > AH->maxDumpId)
2509  AH->maxDumpId = te->dumpId;
2510 
2511  /* Sanity check */
2512  if (te->dumpId <= 0)
2513  fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2514  te->dumpId);
2515 
2516  te->hadDumper = ReadInt(AH);
2517 
2518  if (AH->version >= K_VERS_1_8)
2519  {
2520  tmp = ReadStr(AH);
2521  sscanf(tmp, "%u", &te->catalogId.tableoid);
2522  free(tmp);
2523  }
2524  else
2526  tmp = ReadStr(AH);
2527  sscanf(tmp, "%u", &te->catalogId.oid);
2528  free(tmp);
2529 
2530  te->tag = ReadStr(AH);
2531  te->desc = ReadStr(AH);
2532 
2533  if (AH->version >= K_VERS_1_11)
2534  {
2535  te->section = ReadInt(AH);
2536  }
2537  else
2538  {
2539  /*
2540  * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2541  * the entries into sections. This list need not cover entry
2542  * types added later than 8.4.
2543  */
2544  if (strcmp(te->desc, "COMMENT") == 0 ||
2545  strcmp(te->desc, "ACL") == 0 ||
2546  strcmp(te->desc, "ACL LANGUAGE") == 0)
2547  te->section = SECTION_NONE;
2548  else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2549  strcmp(te->desc, "BLOBS") == 0 ||
2550  strcmp(te->desc, "BLOB COMMENTS") == 0)
2551  te->section = SECTION_DATA;
2552  else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2553  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2554  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2555  strcmp(te->desc, "INDEX") == 0 ||
2556  strcmp(te->desc, "RULE") == 0 ||
2557  strcmp(te->desc, "TRIGGER") == 0)
2558  te->section = SECTION_POST_DATA;
2559  else
2560  te->section = SECTION_PRE_DATA;
2561  }
2562 
2563  te->defn = ReadStr(AH);
2564  te->dropStmt = ReadStr(AH);
2565 
2566  if (AH->version >= K_VERS_1_3)
2567  te->copyStmt = ReadStr(AH);
2568 
2569  if (AH->version >= K_VERS_1_6)
2570  te->namespace = ReadStr(AH);
2571 
2572  if (AH->version >= K_VERS_1_10)
2573  te->tablespace = ReadStr(AH);
2574 
2575  if (AH->version >= K_VERS_1_14)
2576  te->tableam = ReadStr(AH);
2577 
2578  te->owner = ReadStr(AH);
2579  if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
2580  pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2581 
2582  /* Read TOC entry dependencies */
2583  if (AH->version >= K_VERS_1_5)
2584  {
2585  depSize = 100;
2586  deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2587  depIdx = 0;
2588  for (;;)
2589  {
2590  tmp = ReadStr(AH);
2591  if (!tmp)
2592  break; /* end of list */
2593  if (depIdx >= depSize)
2594  {
2595  depSize *= 2;
2596  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2597  }
2598  sscanf(tmp, "%d", &deps[depIdx]);
2599  free(tmp);
2600  depIdx++;
2601  }
2602 
2603  if (depIdx > 0) /* We have a non-null entry */
2604  {
2605  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2606  te->dependencies = deps;
2607  te->nDeps = depIdx;
2608  }
2609  else
2610  {
2611  free(deps);
2612  te->dependencies = NULL;
2613  te->nDeps = 0;
2614  }
2615  }
2616  else
2617  {
2618  te->dependencies = NULL;
2619  te->nDeps = 0;
2620  }
2621  te->dataLength = 0;
2622 
2623  if (AH->ReadExtraTocPtr)
2624  AH->ReadExtraTocPtr(AH, te);
2625 
2626  pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2627  i, te->dumpId, te->desc, te->tag);
2628 
2629  /* link completed entry into TOC circular list */
2630  te->prev = AH->toc->prev;
2631  AH->toc->prev->next = te;
2632  AH->toc->prev = te;
2633  te->next = AH->toc;
2634 
2635  /* special processing immediately upon read for some items */
2636  if (strcmp(te->desc, "ENCODING") == 0)
2637  processEncodingEntry(AH, te);
2638  else if (strcmp(te->desc, "STDSTRINGS") == 0)
2639  processStdStringsEntry(AH, te);
2640  else if (strcmp(te->desc, "SEARCHPATH") == 0)
2641  processSearchPathEntry(AH, te);
2642  }
2643 }
2644 
2645 static void
2647 {
2648  /* te->defn should have the form SET client_encoding = 'foo'; */
2649  char *defn = pg_strdup(te->defn);
2650  char *ptr1;
2651  char *ptr2 = NULL;
2652  int encoding;
2653 
2654  ptr1 = strchr(defn, '\'');
2655  if (ptr1)
2656  ptr2 = strchr(++ptr1, '\'');
2657  if (ptr2)
2658  {
2659  *ptr2 = '\0';
2660  encoding = pg_char_to_encoding(ptr1);
2661  if (encoding < 0)
2662  fatal("unrecognized encoding \"%s\"",
2663  ptr1);
2664  AH->public.encoding = encoding;
2665  }
2666  else
2667  fatal("invalid ENCODING item: %s",
2668  te->defn);
2669 
2670  free(defn);
2671 }
2672 
2673 static void
2675 {
2676  /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2677  char *ptr1;
2678 
2679  ptr1 = strchr(te->defn, '\'');
2680  if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2681  AH->public.std_strings = true;
2682  else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2683  AH->public.std_strings = false;
2684  else
2685  fatal("invalid STDSTRINGS item: %s",
2686  te->defn);
2687 }
2688 
2689 static void
2691 {
2692  /*
2693  * te->defn should contain a command to set search_path. We just copy it
2694  * verbatim for use later.
2695  */
2696  AH->public.searchpath = pg_strdup(te->defn);
2697 }
2698 
2699 static void
2701 {
2702  const char *missing_name;
2703 
2704  Assert(ropt->strict_names);
2705 
2706  if (ropt->schemaNames.head != NULL)
2707  {
2708  missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2709  if (missing_name != NULL)
2710  fatal("schema \"%s\" not found", missing_name);
2711  }
2712 
2713  if (ropt->tableNames.head != NULL)
2714  {
2715  missing_name = simple_string_list_not_touched(&ropt->tableNames);
2716  if (missing_name != NULL)
2717  fatal("table \"%s\" not found", missing_name);
2718  }
2719 
2720  if (ropt->indexNames.head != NULL)
2721  {
2722  missing_name = simple_string_list_not_touched(&ropt->indexNames);
2723  if (missing_name != NULL)
2724  fatal("index \"%s\" not found", missing_name);
2725  }
2726 
2727  if (ropt->functionNames.head != NULL)
2728  {
2729  missing_name = simple_string_list_not_touched(&ropt->functionNames);
2730  if (missing_name != NULL)
2731  fatal("function \"%s\" not found", missing_name);
2732  }
2733 
2734  if (ropt->triggerNames.head != NULL)
2735  {
2736  missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2737  if (missing_name != NULL)
2738  fatal("trigger \"%s\" not found", missing_name);
2739  }
2740 }
2741 
2742 /*
2743  * Determine whether we want to restore this TOC entry.
2744  *
2745  * Returns 0 if entry should be skipped, or some combination of the
2746  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2747  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2748  */
2749 static int
2751 {
2752  int res = REQ_SCHEMA | REQ_DATA;
2753  RestoreOptions *ropt = AH->public.ropt;
2754 
2755  /* These items are treated specially */
2756  if (strcmp(te->desc, "ENCODING") == 0 ||
2757  strcmp(te->desc, "STDSTRINGS") == 0 ||
2758  strcmp(te->desc, "SEARCHPATH") == 0)
2759  return REQ_SPECIAL;
2760 
2761  /*
2762  * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2763  * restored in createDB mode, and not restored otherwise, independently of
2764  * all else.
2765  */
2766  if (strcmp(te->desc, "DATABASE") == 0 ||
2767  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2768  {
2769  if (ropt->createDB)
2770  return REQ_SCHEMA;
2771  else
2772  return 0;
2773  }
2774 
2775  /*
2776  * Process exclusions that affect certain classes of TOC entries.
2777  */
2778 
2779  /* If it's an ACL, maybe ignore it */
2780  if (ropt->aclsSkip && _tocEntryIsACL(te))
2781  return 0;
2782 
2783  /* If it's a comment, maybe ignore it */
2784  if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2785  return 0;
2786 
2787  /*
2788  * If it's a publication or a table part of a publication, maybe ignore
2789  * it.
2790  */
2791  if (ropt->no_publications &&
2792  (strcmp(te->desc, "PUBLICATION") == 0 ||
2793  strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2794  strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2795  return 0;
2796 
2797  /* If it's a security label, maybe ignore it */
2798  if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2799  return 0;
2800 
2801  /* If it's a subscription, maybe ignore it */
2802  if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2803  return 0;
2804 
2805  /* Ignore it if section is not to be dumped/restored */
2806  switch (curSection)
2807  {
2808  case SECTION_PRE_DATA:
2809  if (!(ropt->dumpSections & DUMP_PRE_DATA))
2810  return 0;
2811  break;
2812  case SECTION_DATA:
2813  if (!(ropt->dumpSections & DUMP_DATA))
2814  return 0;
2815  break;
2816  case SECTION_POST_DATA:
2817  if (!(ropt->dumpSections & DUMP_POST_DATA))
2818  return 0;
2819  break;
2820  default:
2821  /* shouldn't get here, really, but ignore it */
2822  return 0;
2823  }
2824 
2825  /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2826  if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2827  return 0;
2828 
2829  /*
2830  * Check options for selective dump/restore.
2831  */
2832  if (strcmp(te->desc, "ACL") == 0 ||
2833  strcmp(te->desc, "COMMENT") == 0 ||
2834  strcmp(te->desc, "SECURITY LABEL") == 0)
2835  {
2836  /* Database properties react to createDB, not selectivity options. */
2837  if (strncmp(te->tag, "DATABASE ", 9) == 0)
2838  {
2839  if (!ropt->createDB)
2840  return 0;
2841  }
2842  else if (ropt->schemaNames.head != NULL ||
2843  ropt->schemaExcludeNames.head != NULL ||
2844  ropt->selTypes)
2845  {
2846  /*
2847  * In a selective dump/restore, we want to restore these dependent
2848  * TOC entry types only if their parent object is being restored.
2849  * Without selectivity options, we let through everything in the
2850  * archive. Note there may be such entries with no parent, eg
2851  * non-default ACLs for built-in objects.
2852  *
2853  * This code depends on the parent having been marked already,
2854  * which should be the case; if it isn't, perhaps due to
2855  * SortTocFromFile rearrangement, skipping the dependent entry
2856  * seems prudent anyway.
2857  *
2858  * Ideally we'd handle, eg, table CHECK constraints this way too.
2859  * But it's hard to tell which of their dependencies is the one to
2860  * consult.
2861  */
2862  if (te->nDeps != 1 ||
2863  TocIDRequired(AH, te->dependencies[0]) == 0)
2864  return 0;
2865  }
2866  }
2867  else
2868  {
2869  /* Apply selective-restore rules for standalone TOC entries. */
2870  if (ropt->schemaNames.head != NULL)
2871  {
2872  /* If no namespace is specified, it means all. */
2873  if (!te->namespace)
2874  return 0;
2875  if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2876  return 0;
2877  }
2878 
2879  if (ropt->schemaExcludeNames.head != NULL &&
2880  te->namespace &&
2881  simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2882  return 0;
2883 
2884  if (ropt->selTypes)
2885  {
2886  if (strcmp(te->desc, "TABLE") == 0 ||
2887  strcmp(te->desc, "TABLE DATA") == 0 ||
2888  strcmp(te->desc, "VIEW") == 0 ||
2889  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2890  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2891  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2892  strcmp(te->desc, "SEQUENCE") == 0 ||
2893  strcmp(te->desc, "SEQUENCE SET") == 0)
2894  {
2895  if (!ropt->selTable)
2896  return 0;
2897  if (ropt->tableNames.head != NULL &&
2899  return 0;
2900  }
2901  else if (strcmp(te->desc, "INDEX") == 0)
2902  {
2903  if (!ropt->selIndex)
2904  return 0;
2905  if (ropt->indexNames.head != NULL &&
2907  return 0;
2908  }
2909  else if (strcmp(te->desc, "FUNCTION") == 0 ||
2910  strcmp(te->desc, "AGGREGATE") == 0 ||
2911  strcmp(te->desc, "PROCEDURE") == 0)
2912  {
2913  if (!ropt->selFunction)
2914  return 0;
2915  if (ropt->functionNames.head != NULL &&
2917  return 0;
2918  }
2919  else if (strcmp(te->desc, "TRIGGER") == 0)
2920  {
2921  if (!ropt->selTrigger)
2922  return 0;
2923  if (ropt->triggerNames.head != NULL &&
2925  return 0;
2926  }
2927  else
2928  return 0;
2929  }
2930  }
2931 
2932  /*
2933  * Determine whether the TOC entry contains schema and/or data components,
2934  * and mask off inapplicable REQ bits. If it had a dataDumper, assume
2935  * it's both schema and data. Otherwise it's probably schema-only, but
2936  * there are exceptions.
2937  */
2938  if (!te->hadDumper)
2939  {
2940  /*
2941  * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
2942  * it is considered a data entry. We don't need to check for the
2943  * BLOBS entry or old-style BLOB COMMENTS, because they will have
2944  * hadDumper = true ... but we do need to check new-style BLOB ACLs,
2945  * comments, etc.
2946  */
2947  if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2948  strcmp(te->desc, "BLOB") == 0 ||
2949  (strcmp(te->desc, "ACL") == 0 &&
2950  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2951  (strcmp(te->desc, "COMMENT") == 0 &&
2952  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2953  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2954  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
2955  res = res & REQ_DATA;
2956  else
2957  res = res & ~REQ_DATA;
2958  }
2959 
2960  /* If there's no definition command, there's no schema component */
2961  if (!te->defn || !te->defn[0])
2962  res = res & ~REQ_SCHEMA;
2963 
2964  /*
2965  * Special case: <Init> type with <Max OID> tag; this is obsolete and we
2966  * always ignore it.
2967  */
2968  if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
2969  return 0;
2970 
2971  /* Mask it if we only want schema */
2972  if (ropt->schemaOnly)
2973  {
2974  /*
2975  * The sequence_data option overrides schemaOnly for SEQUENCE SET.
2976  *
2977  * In binary-upgrade mode, even with schemaOnly set, we do not mask
2978  * out large objects. (Only large object definitions, comments and
2979  * other metadata should be generated in binary-upgrade mode, not the
2980  * actual data, but that need not concern us here.)
2981  */
2982  if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
2983  !(ropt->binary_upgrade &&
2984  (strcmp(te->desc, "BLOB") == 0 ||
2985  (strcmp(te->desc, "ACL") == 0 &&
2986  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2987  (strcmp(te->desc, "COMMENT") == 0 &&
2988  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2989  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
2990  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
2991  res = res & REQ_SCHEMA;
2992  }
2993 
2994  /* Mask it if we only want data */
2995  if (ropt->dataOnly)
2996  res = res & REQ_DATA;
2997 
2998  return res;
2999 }
3000 
3001 /*
3002  * Identify which pass we should restore this TOC entry in.
3003  *
3004  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3005  */
3006 static RestorePass
3008 {
3009  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3010  if (strcmp(te->desc, "ACL") == 0 ||
3011  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3012  strcmp(te->desc, "DEFAULT ACL") == 0)
3013  return RESTORE_PASS_ACL;
3014  if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3015  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3016  return RESTORE_PASS_POST_ACL;
3017 
3018  /*
3019  * Comments need to be emitted in the same pass as their parent objects.
3020  * ACLs haven't got comments, and neither do matview data objects, but
3021  * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3022  * we'd need yet another weird special case.)
3023  */
3024  if (strcmp(te->desc, "COMMENT") == 0 &&
3025  strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3026  return RESTORE_PASS_POST_ACL;
3027 
3028  /* All else can be handled in the main pass. */
3029  return RESTORE_PASS_MAIN;
3030 }
3031 
3032 /*
3033  * Identify TOC entries that are ACLs.
3034  *
3035  * Note: it seems worth duplicating some code here to avoid a hard-wired
3036  * assumption that these are exactly the same entries that we restore during
3037  * the RESTORE_PASS_ACL phase.
3038  */
3039 static bool
3041 {
3042  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3043  if (strcmp(te->desc, "ACL") == 0 ||
3044  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3045  strcmp(te->desc, "DEFAULT ACL") == 0)
3046  return true;
3047  return false;
3048 }
3049 
3050 /*
3051  * Issue SET commands for parameters that we want to have set the same way
3052  * at all times during execution of a restore script.
3053  */
3054 static void
3056 {
3057  RestoreOptions *ropt = AH->public.ropt;
3058 
3059  /*
3060  * Disable timeouts to allow for slow commands, idle parallel workers, etc
3061  */
3062  ahprintf(AH, "SET statement_timeout = 0;\n");
3063  ahprintf(AH, "SET lock_timeout = 0;\n");
3064  ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3065 
3066  /* Select the correct character set encoding */
3067  ahprintf(AH, "SET client_encoding = '%s';\n",
3069 
3070  /* Select the correct string literal syntax */
3071  ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3072  AH->public.std_strings ? "on" : "off");
3073 
3074  /* Select the role to be used during restore */
3075  if (ropt && ropt->use_role)
3076  ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3077 
3078  /* Select the dump-time search_path */
3079  if (AH->public.searchpath)
3080  ahprintf(AH, "%s", AH->public.searchpath);
3081 
3082  /* Make sure function checking is disabled */
3083  ahprintf(AH, "SET check_function_bodies = false;\n");
3084 
3085  /* Ensure that all valid XML data will be accepted */
3086  ahprintf(AH, "SET xmloption = content;\n");
3087 
3088  /* Avoid annoying notices etc */
3089  ahprintf(AH, "SET client_min_messages = warning;\n");
3090  if (!AH->public.std_strings)
3091  ahprintf(AH, "SET escape_string_warning = off;\n");
3092 
3093  /* Adjust row-security state */
3094  if (ropt && ropt->enable_row_security)
3095  ahprintf(AH, "SET row_security = on;\n");
3096  else
3097  ahprintf(AH, "SET row_security = off;\n");
3098 
3099  ahprintf(AH, "\n");
3100 }
3101 
3102 /*
3103  * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3104  * for updating state if appropriate. If user is NULL or an empty string,
3105  * the specification DEFAULT will be used.
3106  */
3107 static void
3109 {
3111 
3112  appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3113 
3114  /*
3115  * SQL requires a string literal here. Might as well be correct.
3116  */
3117  if (user && *user)
3118  appendStringLiteralAHX(cmd, user, AH);
3119  else
3120  appendPQExpBufferStr(cmd, "DEFAULT");
3121  appendPQExpBufferChar(cmd, ';');
3122 
3123  if (RestoringToDB(AH))
3124  {
3125  PGresult *res;
3126 
3127  res = PQexec(AH->connection, cmd->data);
3128 
3129  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3130  /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3131  fatal("could not set session user to \"%s\": %s",
3133 
3134  PQclear(res);
3135  }
3136  else
3137  ahprintf(AH, "%s\n\n", cmd->data);
3138 
3139  destroyPQExpBuffer(cmd);
3140 }
3141 
3142 
3143 /*
3144  * Issue the commands to connect to the specified database.
3145  *
3146  * If we're currently restoring right into a database, this will
3147  * actually establish a connection. Otherwise it puts a \connect into
3148  * the script output.
3149  */
3150 static void
3152 {
3153  if (RestoringToDB(AH))
3155  else
3156  {
3157  PQExpBufferData connectbuf;
3158 
3159  initPQExpBuffer(&connectbuf);
3160  appendPsqlMetaConnect(&connectbuf, dbname);
3161  ahprintf(AH, "%s\n", connectbuf.data);
3162  termPQExpBuffer(&connectbuf);
3163  }
3164 
3165  /*
3166  * NOTE: currUser keeps track of what the imaginary session user in our
3167  * script is. It's now effectively reset to the original userID.
3168  */
3169  if (AH->currUser)
3170  free(AH->currUser);
3171  AH->currUser = NULL;
3172 
3173  /* don't assume we still know the output schema, tablespace, etc either */
3174  if (AH->currSchema)
3175  free(AH->currSchema);
3176  AH->currSchema = NULL;
3177  if (AH->currTablespace)
3178  free(AH->currTablespace);
3179  AH->currTablespace = NULL;
3180 
3181  /* re-establish fixed state */
3183 }
3184 
3185 /*
3186  * Become the specified user, and update state to avoid redundant commands
3187  *
3188  * NULL or empty argument is taken to mean restoring the session default
3189  */
3190 static void
3191 _becomeUser(ArchiveHandle *AH, const char *user)
3192 {
3193  if (!user)
3194  user = ""; /* avoid null pointers */
3195 
3196  if (AH->currUser && strcmp(AH->currUser, user) == 0)
3197  return; /* no need to do anything */
3198 
3199  _doSetSessionAuth(AH, user);
3200 
3201  /*
3202  * NOTE: currUser keeps track of what the imaginary session user in our
3203  * script is
3204  */
3205  if (AH->currUser)
3206  free(AH->currUser);
3207  AH->currUser = pg_strdup(user);
3208 }
3209 
3210 /*
3211  * Become the owner of the given TOC entry object. If
3212  * changes in ownership are not allowed, this doesn't do anything.
3213  */
3214 static void
3216 {
3217  RestoreOptions *ropt = AH->public.ropt;
3218 
3219  if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3220  return;
3221 
3222  _becomeUser(AH, te->owner);
3223 }
3224 
3225 
3226 /*
3227  * Issue the commands to select the specified schema as the current schema
3228  * in the target database.
3229  */
3230 static void
3231 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3232 {
3233  PQExpBuffer qry;
3234 
3235  /*
3236  * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3237  * that search_path rather than switching to entry-specific paths.
3238  * Otherwise, it's an old archive that will not restore correctly unless
3239  * we set the search_path as it's expecting.
3240  */
3241  if (AH->public.searchpath)
3242  return;
3243 
3244  if (!schemaName || *schemaName == '\0' ||
3245  (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3246  return; /* no need to do anything */
3247 
3248  qry = createPQExpBuffer();
3249 
3250  appendPQExpBuffer(qry, "SET search_path = %s",
3251  fmtId(schemaName));
3252  if (strcmp(schemaName, "pg_catalog") != 0)
3253  appendPQExpBufferStr(qry, ", pg_catalog");
3254 
3255  if (RestoringToDB(AH))
3256  {
3257  PGresult *res;
3258 
3259  res = PQexec(AH->connection, qry->data);
3260 
3261  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3263  "could not set search_path to \"%s\": %s",
3264  schemaName, PQerrorMessage(AH->connection));
3265 
3266  PQclear(res);
3267  }
3268  else
3269  ahprintf(AH, "%s;\n\n", qry->data);
3270 
3271  if (AH->currSchema)
3272  free(AH->currSchema);
3273  AH->currSchema = pg_strdup(schemaName);
3274 
3275  destroyPQExpBuffer(qry);
3276 }
3277 
3278 /*
3279  * Issue the commands to select the specified tablespace as the current one
3280  * in the target database.
3281  */
3282 static void
3284 {
3285  RestoreOptions *ropt = AH->public.ropt;
3286  PQExpBuffer qry;
3287  const char *want,
3288  *have;
3289 
3290  /* do nothing in --no-tablespaces mode */
3291  if (ropt->noTablespace)
3292  return;
3293 
3294  have = AH->currTablespace;
3295  want = tablespace;
3296 
3297  /* no need to do anything for non-tablespace object */
3298  if (!want)
3299  return;
3300 
3301  if (have && strcmp(want, have) == 0)
3302  return; /* no need to do anything */
3303 
3304  qry = createPQExpBuffer();
3305 
3306  if (strcmp(want, "") == 0)
3307  {
3308  /* We want the tablespace to be the database's default */
3309  appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3310  }
3311  else
3312  {
3313  /* We want an explicit tablespace */
3314  appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3315  }
3316 
3317  if (RestoringToDB(AH))
3318  {
3319  PGresult *res;
3320 
3321  res = PQexec(AH->connection, qry->data);
3322 
3323  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3325  "could not set default_tablespace to %s: %s",
3326  fmtId(want), PQerrorMessage(AH->connection));
3327 
3328  PQclear(res);
3329  }
3330  else
3331  ahprintf(AH, "%s;\n\n", qry->data);
3332 
3333  if (AH->currTablespace)
3334  free(AH->currTablespace);
3335  AH->currTablespace = pg_strdup(want);
3336 
3337  destroyPQExpBuffer(qry);
3338 }
3339 
3340 /*
3341  * Set the proper default_table_access_method value for the table.
3342  */
3343 static void
3344 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3345 {
3346  PQExpBuffer cmd;
3347  const char *want,
3348  *have;
3349 
3350  have = AH->currTableAm;
3351  want = tableam;
3352 
3353  if (!want)
3354  return;
3355 
3356  if (have && strcmp(want, have) == 0)
3357  return;
3358 
3359  cmd = createPQExpBuffer();
3360  appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3361 
3362  if (RestoringToDB(AH))
3363  {
3364  PGresult *res;
3365 
3366  res = PQexec(AH->connection, cmd->data);
3367 
3368  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3370  "could not set default_table_access_method: %s",
3371  PQerrorMessage(AH->connection));
3372 
3373  PQclear(res);
3374  }
3375  else
3376  ahprintf(AH, "%s\n\n", cmd->data);
3377 
3378  destroyPQExpBuffer(cmd);
3379 
3380  if (AH->currTableAm)
3381  free(AH->currTableAm);
3382  AH->currTableAm = pg_strdup(want);
3383 }
3384 
3385 /*
3386  * Extract an object description for a TOC entry, and append it to buf.
3387  *
3388  * This is used for ALTER ... OWNER TO.
3389  */
3390 static void
3392 {
3393  const char *type = te->desc;
3394 
3395  /* Use ALTER TABLE for views and sequences */
3396  if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
3397  strcmp(type, "MATERIALIZED VIEW") == 0)
3398  type = "TABLE";
3399 
3400  /* objects that don't require special decoration */
3401  if (strcmp(type, "COLLATION") == 0 ||
3402  strcmp(type, "CONVERSION") == 0 ||
3403  strcmp(type, "DOMAIN") == 0 ||
3404  strcmp(type, "TABLE") == 0 ||
3405  strcmp(type, "TYPE") == 0 ||
3406  strcmp(type, "FOREIGN TABLE") == 0 ||
3407  strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3408  strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3409  strcmp(type, "STATISTICS") == 0 ||
3410  /* non-schema-specified objects */
3411  strcmp(type, "DATABASE") == 0 ||
3412  strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3413  strcmp(type, "SCHEMA") == 0 ||
3414  strcmp(type, "EVENT TRIGGER") == 0 ||
3415  strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3416  strcmp(type, "SERVER") == 0 ||
3417  strcmp(type, "PUBLICATION") == 0 ||
3418  strcmp(type, "SUBSCRIPTION") == 0 ||
3419  strcmp(type, "USER MAPPING") == 0)
3420  {
3421  appendPQExpBuffer(buf, "%s ", type);
3422  if (te->namespace && *te->namespace)
3423  appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3425  return;
3426  }
3427 
3428  /* BLOBs just have a name, but it's numeric so must not use fmtId */
3429  if (strcmp(type, "BLOB") == 0)
3430  {
3431  appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3432  return;
3433  }
3434 
3435  /*
3436  * These object types require additional decoration. Fortunately, the
3437  * information needed is exactly what's in the DROP command.
3438  */
3439  if (strcmp(type, "AGGREGATE") == 0 ||
3440  strcmp(type, "FUNCTION") == 0 ||
3441  strcmp(type, "OPERATOR") == 0 ||
3442  strcmp(type, "OPERATOR CLASS") == 0 ||
3443  strcmp(type, "OPERATOR FAMILY") == 0 ||
3444  strcmp(type, "PROCEDURE") == 0)
3445  {
3446  /* Chop "DROP " off the front and make a modifiable copy */
3447  char *first = pg_strdup(te->dropStmt + 5);
3448  char *last;
3449 
3450  /* point to last character in string */
3451  last = first + strlen(first) - 1;
3452 
3453  /* Strip off any ';' or '\n' at the end */
3454  while (last >= first && (*last == '\n' || *last == ';'))
3455  last--;
3456  *(last + 1) = '\0';
3457 
3458  appendPQExpBufferStr(buf, first);
3459 
3460  free(first);
3461  return;
3462  }
3463 
3464  pg_log_warning("don't know how to set owner for object type \"%s\"",
3465  type);
3466 }
3467 
3468 /*
3469  * Emit the SQL commands to create the object represented by a TOC entry
3470  *
3471  * This now also includes issuing an ALTER OWNER command to restore the
3472  * object's ownership, if wanted. But note that the object's permissions
3473  * will remain at default, until the matching ACL TOC entry is restored.
3474  */
3475 static void
3476 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3477 {
3478  RestoreOptions *ropt = AH->public.ropt;
3479 
3480  /* Select owner, schema, tablespace and default AM as necessary */
3481  _becomeOwner(AH, te);
3482  _selectOutputSchema(AH, te->namespace);
3483  _selectTablespace(AH, te->tablespace);
3485 
3486  /* Emit header comment for item */
3487  if (!AH->noTocComments)
3488  {
3489  const char *pfx;
3490  char *sanitized_name;
3491  char *sanitized_schema;
3492  char *sanitized_owner;
3493 
3494  if (isData)
3495  pfx = "Data for ";
3496  else
3497  pfx = "";
3498 
3499  ahprintf(AH, "--\n");
3500  if (AH->public.verbose)
3501  {
3502  ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3503  te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3504  if (te->nDeps > 0)
3505  {
3506  int i;
3507 
3508  ahprintf(AH, "-- Dependencies:");
3509  for (i = 0; i < te->nDeps; i++)
3510  ahprintf(AH, " %d", te->dependencies[i]);
3511  ahprintf(AH, "\n");
3512  }
3513  }
3514 
3515  sanitized_name = sanitize_line(te->tag, false);
3516  sanitized_schema = sanitize_line(te->namespace, true);
3517  sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3518 
3519  ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3520  pfx, sanitized_name, te->desc, sanitized_schema,
3521  sanitized_owner);
3522 
3523  free(sanitized_name);
3524  free(sanitized_schema);
3525  free(sanitized_owner);
3526 
3527  if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3528  {
3529  char *sanitized_tablespace;
3530 
3531  sanitized_tablespace = sanitize_line(te->tablespace, false);
3532  ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3533  free(sanitized_tablespace);
3534  }
3535  ahprintf(AH, "\n");
3536 
3537  if (AH->PrintExtraTocPtr != NULL)
3538  AH->PrintExtraTocPtr(AH, te);
3539  ahprintf(AH, "--\n\n");
3540  }
3541 
3542  /*
3543  * Actually print the definition.
3544  *
3545  * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3546  * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3547  * "public" that is a comment. We have to do this when --no-owner mode is
3548  * selected. This is ugly, but I see no other good way ...
3549  */
3550  if (ropt->noOwner &&
3551  strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3552  {
3553  ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3554  }
3555  else
3556  {
3557  if (te->defn && strlen(te->defn) > 0)
3558  ahprintf(AH, "%s\n\n", te->defn);
3559  }
3560 
3561  /*
3562  * If we aren't using SET SESSION AUTH to determine ownership, we must
3563  * instead issue an ALTER OWNER command. Schema "public" is special; when
3564  * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3565  * when using SET SESSION for all other objects. We assume that anything
3566  * without a DROP command is not a separately ownable object. All the
3567  * categories with DROP commands must appear in one list or the other.
3568  */
3569  if (!ropt->noOwner &&
3570  (!ropt->use_setsessauth ||
3571  (strcmp(te->desc, "SCHEMA") == 0 &&
3572  strncmp(te->defn, "--", 2) == 0)) &&
3573  te->owner && strlen(te->owner) > 0 &&
3574  te->dropStmt && strlen(te->dropStmt) > 0)
3575  {
3576  if (strcmp(te->desc, "AGGREGATE") == 0 ||
3577  strcmp(te->desc, "BLOB") == 0 ||
3578  strcmp(te->desc, "COLLATION") == 0 ||
3579  strcmp(te->desc, "CONVERSION") == 0 ||
3580  strcmp(te->desc, "DATABASE") == 0 ||
3581  strcmp(te->desc, "DOMAIN") == 0 ||
3582  strcmp(te->desc, "FUNCTION") == 0 ||
3583  strcmp(te->desc, "OPERATOR") == 0 ||
3584  strcmp(te->desc, "OPERATOR CLASS") == 0 ||
3585  strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
3586  strcmp(te->desc, "PROCEDURE") == 0 ||
3587  strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
3588  strcmp(te->desc, "SCHEMA") == 0 ||
3589  strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3590  strcmp(te->desc, "TABLE") == 0 ||
3591  strcmp(te->desc, "TYPE") == 0 ||
3592  strcmp(te->desc, "VIEW") == 0 ||
3593  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3594  strcmp(te->desc, "SEQUENCE") == 0 ||
3595  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3596  strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
3597  strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
3598  strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
3599  strcmp(te->desc, "SERVER") == 0 ||
3600  strcmp(te->desc, "STATISTICS") == 0 ||
3601  strcmp(te->desc, "PUBLICATION") == 0 ||
3602  strcmp(te->desc, "SUBSCRIPTION") == 0)
3603  {
3604  PQExpBuffer temp = createPQExpBuffer();
3605 
3606  appendPQExpBufferStr(temp, "ALTER ");
3607  _getObjectDescription(temp, te);
3608  appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
3609  ahprintf(AH, "%s\n\n", temp->data);
3610  destroyPQExpBuffer(temp);
3611  }
3612  else if (strcmp(te->desc, "CAST") == 0 ||
3613  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
3614  strcmp(te->desc, "CONSTRAINT") == 0 ||
3615  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
3616  strcmp(te->desc, "DEFAULT") == 0 ||
3617  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
3618  strcmp(te->desc, "INDEX") == 0 ||
3619  strcmp(te->desc, "RULE") == 0 ||
3620  strcmp(te->desc, "TRIGGER") == 0 ||
3621  strcmp(te->desc, "ROW SECURITY") == 0 ||
3622  strcmp(te->desc, "POLICY") == 0 ||
3623  strcmp(te->desc, "USER MAPPING") == 0)
3624  {
3625  /* these object types don't have separate owners */
3626  }
3627  else
3628  {
3629  pg_log_warning("don't know how to set owner for object type \"%s\"",
3630  te->desc);
3631  }
3632  }
3633 
3634  /*
3635  * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3636  * commands, so we can no longer assume we know the current auth setting.
3637  */
3638  if (_tocEntryIsACL(te))
3639  {
3640  if (AH->currUser)
3641  free(AH->currUser);
3642  AH->currUser = NULL;
3643  }
3644 }
3645 
3646 /*
3647  * Sanitize a string to be included in an SQL comment or TOC listing, by
3648  * replacing any newlines with spaces. This ensures each logical output line
3649  * is in fact one physical output line, to prevent corruption of the dump
3650  * (which could, in the worst case, present an SQL injection vulnerability
3651  * if someone were to incautiously load a dump containing objects with
3652  * maliciously crafted names).
3653  *
3654  * The result is a freshly malloc'd string. If the input string is NULL,
3655  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3656  * malloc'ed hyphen.
3657  *
3658  * Note that we currently don't bother to quote names, meaning that the name
3659  * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3660  * it only examines the dumpId field, but someday we might want to try harder.
3661  */
3662 static char *
3663 sanitize_line(const char *str, bool want_hyphen)
3664 {
3665  char *result;
3666  char *s;
3667 
3668  if (!str)
3669  return pg_strdup(want_hyphen ? "-" : "");
3670 
3671  result = pg_strdup(str);
3672 
3673  for (s = result; *s != '\0'; s++)
3674  {
3675  if (*s == '\n' || *s == '\r')
3676  *s = ' ';
3677  }
3678 
3679  return result;
3680 }
3681 
3682 /*
3683  * Write the file header for a custom-format archive
3684  */
3685 void
3687 {
3688  struct tm crtm;
3689 
3690  AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3691  AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3692  AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3693  AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3694  AH->WriteBytePtr(AH, AH->intSize);
3695  AH->WriteBytePtr(AH, AH->offSize);
3696  AH->WriteBytePtr(AH, AH->format);
3697  WriteInt(AH, AH->compression);
3698  crtm = *localtime(&AH->createDate);
3699  WriteInt(AH, crtm.tm_sec);
3700  WriteInt(AH, crtm.tm_min);
3701  WriteInt(AH, crtm.tm_hour);
3702  WriteInt(AH, crtm.tm_mday);
3703  WriteInt(AH, crtm.tm_mon);
3704  WriteInt(AH, crtm.tm_year);
3705  WriteInt(AH, crtm.tm_isdst);
3706  WriteStr(AH, PQdb(AH->connection));
3707  WriteStr(AH, AH->public.remoteVersionStr);
3708  WriteStr(AH, PG_VERSION);
3709 }
3710 
3711 void
3713 {
3714  char vmaj,
3715  vmin,
3716  vrev;
3717  int fmt;
3718 
3719  /*
3720  * If we haven't already read the header, do so.
3721  *
3722  * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3723  * way to unify the cases?
3724  */
3725  if (!AH->readHeader)
3726  {
3727  char tmpMag[7];
3728 
3729  AH->ReadBufPtr(AH, tmpMag, 5);
3730 
3731  if (strncmp(tmpMag, "PGDMP", 5) != 0)
3732  fatal("did not find magic string in file header");
3733  }
3734 
3735  vmaj = AH->ReadBytePtr(AH);
3736  vmin = AH->ReadBytePtr(AH);
3737 
3738  if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3739  vrev = AH->ReadBytePtr(AH);
3740  else
3741  vrev = 0;
3742 
3743  AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3744 
3745  if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3746  fatal("unsupported version (%d.%d) in file header",
3747  vmaj, vmin);
3748 
3749  AH->intSize = AH->ReadBytePtr(AH);
3750  if (AH->intSize > 32)
3751  fatal("sanity check on integer size (%lu) failed",
3752  (unsigned long) AH->intSize);
3753 
3754  if (AH->intSize > sizeof(int))
3755  pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3756 
3757  if (AH->version >= K_VERS_1_7)
3758  AH->offSize = AH->ReadBytePtr(AH);
3759  else
3760  AH->offSize = AH->intSize;
3761 
3762  fmt = AH->ReadBytePtr(AH);
3763 
3764  if (AH->format != fmt)
3765  fatal("expected format (%d) differs from format found in file (%d)",
3766  AH->format, fmt);
3767 
3768  if (AH->version >= K_VERS_1_2)
3769  {
3770  if (AH->version < K_VERS_1_4)
3771  AH->compression = AH->ReadBytePtr(AH);
3772  else
3773  AH->compression = ReadInt(AH);
3774  }
3775  else
3777 
3778 #ifndef HAVE_LIBZ
3779  if (AH->compression != 0)
3780  pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
3781 #endif
3782 
3783  if (AH->version >= K_VERS_1_4)
3784  {
3785  struct tm crtm;
3786 
3787  crtm.tm_sec = ReadInt(AH);
3788  crtm.tm_min = ReadInt(AH);
3789  crtm.tm_hour = ReadInt(AH);
3790  crtm.tm_mday = ReadInt(AH);
3791  crtm.tm_mon = ReadInt(AH);
3792  crtm.tm_year = ReadInt(AH);
3793  crtm.tm_isdst = ReadInt(AH);
3794 
3795  /*
3796  * Newer versions of glibc have mktime() report failure if tm_isdst is
3797  * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3798  * TZ=UTC. This is problematic when restoring an archive under a
3799  * different timezone setting. If we get a failure, try again with
3800  * tm_isdst set to -1 ("don't know").
3801  *
3802  * XXX with or without this hack, we reconstruct createDate
3803  * incorrectly when the prevailing timezone is different from
3804  * pg_dump's. Next time we bump the archive version, we should flush
3805  * this representation and store a plain seconds-since-the-Epoch
3806  * timestamp instead.
3807  */
3808  AH->createDate = mktime(&crtm);
3809  if (AH->createDate == (time_t) -1)
3810  {
3811  crtm.tm_isdst = -1;
3812  AH->createDate = mktime(&crtm);
3813  if (AH->createDate == (time_t) -1)
3814  pg_log_warning("invalid creation date in header");
3815  }
3816  }
3817 
3818  if (AH->version >= K_VERS_1_4)
3819  {
3820  AH->archdbname = ReadStr(AH);
3821  }
3822 
3823  if (AH->version >= K_VERS_1_10)
3824  {
3825  AH->archiveRemoteVersion = ReadStr(AH);
3826  AH->archiveDumpVersion = ReadStr(AH);
3827  }
3828 }
3829 
3830 
3831 /*
3832  * checkSeek
3833  * check to see if ftell/fseek can be performed.
3834  */
3835 bool
3836 checkSeek(FILE *fp)
3837 {
3838  pgoff_t tpos;
3839 
3840  /* Check that ftello works on this file */
3841  tpos = ftello(fp);
3842  if (tpos < 0)
3843  return false;
3844 
3845  /*
3846  * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3847  * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3848  * successful no-op even on files that are otherwise unseekable.
3849  */
3850  if (fseeko(fp, tpos, SEEK_SET) != 0)
3851  return false;
3852 
3853  return true;
3854 }
3855 
3856 
3857 /*
3858  * dumpTimestamp
3859  */
3860 static void
3861 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3862 {
3863  char buf[64];
3864 
3865  if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3866  ahprintf(AH, "-- %s %s\n\n", msg, buf);
3867 }
3868 
3869 /*
3870  * Main engine for parallel restore.
3871  *
3872  * Parallel restore is done in three phases. In this first phase,
3873  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3874  * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3875  * PRE_DATA items other than ACLs.) Entries we can't process now are
3876  * added to the pending_list for later phases to deal with.
3877  */
3878 static void
3880 {
3881  bool skipped_some;
3882  TocEntry *next_work_item;
3883 
3884  pg_log_debug("entering restore_toc_entries_prefork");
3885 
3886  /* Adjust dependency information */
3887  fix_dependencies(AH);
3888 
3889  /*
3890  * Do all the early stuff in a single connection in the parent. There's no
3891  * great point in running it in parallel, in fact it will actually run
3892  * faster in a single connection because we avoid all the connection and
3893  * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3894  * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3895  * not risk trying to process them out-of-order.
3896  *
3897  * Stuff that we can't do immediately gets added to the pending_list.
3898  * Note: we don't yet filter out entries that aren't going to be restored.
3899  * They might participate in dependency chains connecting entries that
3900  * should be restored, so we treat them as live until we actually process
3901  * them.
3902  *
3903  * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3904  * before DATA items, and all DATA items before POST_DATA items. That is
3905  * not certain to be true in older archives, though, and in any case use
3906  * of a list file would destroy that ordering (cf. SortTocFromFile). So
3907  * this loop cannot assume that it holds.
3908  */
3910  skipped_some = false;
3911  for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3912  {
3913  bool do_now = true;
3914 
3915  if (next_work_item->section != SECTION_PRE_DATA)
3916  {
3917  /* DATA and POST_DATA items are just ignored for now */
3918  if (next_work_item->section == SECTION_DATA ||
3919  next_work_item->section == SECTION_POST_DATA)
3920  {
3921  do_now = false;
3922  skipped_some = true;
3923  }
3924  else
3925  {
3926  /*
3927  * SECTION_NONE items, such as comments, can be processed now
3928  * if we are still in the PRE_DATA part of the archive. Once
3929  * we've skipped any items, we have to consider whether the
3930  * comment's dependencies are satisfied, so skip it for now.
3931  */
3932  if (skipped_some)
3933  do_now = false;
3934  }
3935  }
3936 
3937  /*
3938  * Also skip items that need to be forced into later passes. We need
3939  * not set skipped_some in this case, since by assumption no main-pass
3940  * items could depend on these.
3941  */
3942  if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3943  do_now = false;
3944 
3945  if (do_now)
3946  {
3947  /* OK, restore the item and update its dependencies */
3948  pg_log_info("processing item %d %s %s",
3949  next_work_item->dumpId,
3950  next_work_item->desc, next_work_item->tag);
3951 
3952  (void) restore_toc_entry(AH, next_work_item, false);
3953 
3954  /* Reduce dependencies, but don't move anything to ready_list */
3955  reduce_dependencies(AH, next_work_item, NULL);
3956  }
3957  else
3958  {
3959  /* Nope, so add it to pending_list */
3960  pending_list_append(pending_list, next_work_item);
3961  }
3962  }
3963 
3964  /*
3965  * Now close parent connection in prep for parallel steps. We do this
3966  * mainly to ensure that we don't exceed the specified number of parallel
3967  * connections.
3968  */
3969  DisconnectDatabase(&AH->public);
3970 
3971  /* blow away any transient state from the old connection */
3972  if (AH->currUser)
3973  free(AH->currUser);
3974  AH->currUser = NULL;
3975  if (AH->currSchema)
3976  free(AH->currSchema);
3977  AH->currSchema = NULL;
3978  if (AH->currTablespace)
3979  free(AH->currTablespace);
3980  AH->currTablespace = NULL;
3981  if (AH->currTableAm)
3982  free(AH->currTableAm);
3983  AH->currTableAm = NULL;
3984 }
3985 
3986 /*
3987  * Main engine for parallel restore.
3988  *
3989  * Parallel restore is done in three phases. In this second phase,
3990  * we process entries by dispatching them to parallel worker children
3991  * (processes on Unix, threads on Windows), each of which connects
3992  * separately to the database. Inter-entry dependencies are respected,
3993  * and so is the RestorePass multi-pass structure. When we can no longer
3994  * make any entries ready to process, we exit. Normally, there will be
3995  * nothing left to do; but if there is, the third phase will mop up.
3996  */
3997 static void
3999  TocEntry *pending_list)
4000 {
4001  ParallelReadyList ready_list;
4002  TocEntry *next_work_item;
4003 
4004  pg_log_debug("entering restore_toc_entries_parallel");
4005 
4006  /* Set up ready_list with enough room for all known TocEntrys */
4007  ready_list_init(&ready_list, AH->tocCount);
4008 
4009  /*
4010  * The pending_list contains all items that we need to restore. Move all
4011  * items that are available to process immediately into the ready_list.
4012  * After this setup, the pending list is everything that needs to be done
4013  * but is blocked by one or more dependencies, while the ready list
4014  * contains items that have no remaining dependencies and are OK to
4015  * process in the current restore pass.
4016  */
4018  move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4019 
4020  /*
4021  * main parent loop
4022  *
4023  * Keep going until there is no worker still running AND there is no work
4024  * left to be done. Note invariant: at top of loop, there should always
4025  * be at least one worker available to dispatch a job to.
4026  */
4027  pg_log_info("entering main parallel loop");
4028 
4029  for (;;)
4030  {
4031  /* Look for an item ready to be dispatched to a worker */
4032  next_work_item = pop_next_work_item(&ready_list, pstate);
4033  if (next_work_item != NULL)
4034  {
4035  /* If not to be restored, don't waste time launching a worker */
4036  if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4037  {
4038  pg_log_info("skipping item %d %s %s",
4039  next_work_item->dumpId,
4040  next_work_item->desc, next_work_item->tag);
4041  /* Update its dependencies as though we'd completed it */
4042  reduce_dependencies(AH, next_work_item, &ready_list);
4043  /* Loop around to see if anything else can be dispatched */
4044  continue;
4045  }
4046 
4047  pg_log_info("launching item %d %s %s",
4048  next_work_item->dumpId,
4049  next_work_item->desc, next_work_item->tag);
4050 
4051  /* Dispatch to some worker */
4052  DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4053  mark_restore_job_done, &ready_list);
4054  }
4055  else if (IsEveryWorkerIdle(pstate))
4056  {
4057  /*
4058  * Nothing is ready and no worker is running, so we're done with
4059  * the current pass or maybe with the whole process.
4060  */
4061  if (AH->restorePass == RESTORE_PASS_LAST)
4062  break; /* No more parallel processing is possible */
4063 
4064  /* Advance to next restore pass */
4065  AH->restorePass++;
4066  /* That probably allows some stuff to be made ready */
4067  move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4068  /* Loop around to see if anything's now ready */
4069  continue;
4070  }
4071  else
4072  {
4073  /*
4074  * We have nothing ready, but at least one child is working, so
4075  * wait for some subjob to finish.
4076  */
4077  }
4078 
4079  /*
4080  * Before dispatching another job, check to see if anything has
4081  * finished. We should check every time through the loop so as to
4082  * reduce dependencies as soon as possible. If we were unable to
4083  * dispatch any job this time through, wait until some worker finishes
4084  * (and, hopefully, unblocks some pending item). If we did dispatch
4085  * something, continue as soon as there's at least one idle worker.
4086  * Note that in either case, there's guaranteed to be at least one
4087  * idle worker when we return to the top of the loop. This ensures we
4088  * won't block inside DispatchJobForTocEntry, which would be
4089  * undesirable: we'd rather postpone dispatching until we see what's
4090  * been unblocked by finished jobs.
4091  */
4092  WaitForWorkers(AH, pstate,
4093  next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4094  }
4095 
4096  /* There should now be nothing in ready_list. */
4097  Assert(ready_list.first_te > ready_list.last_te);
4098 
4099  ready_list_free(&ready_list);
4100 
4101  pg_log_info("finished main parallel loop");
4102 }
4103 
4104 /*
4105  * Main engine for parallel restore.
4106  *
4107  * Parallel restore is done in three phases. In this third phase,
4108  * we mop up any remaining TOC entries by processing them serially.
4109  * This phase normally should have nothing to do, but if we've somehow
4110  * gotten stuck due to circular dependencies or some such, this provides
4111  * at least some chance of completing the restore successfully.
4112  */
4113 static void
4115 {
4116  RestoreOptions *ropt = AH->public.ropt;
4117  TocEntry *te;
4118 
4119  pg_log_debug("entering restore_toc_entries_postfork");
4120 
4121  /*
4122  * Now reconnect the single parent connection.
4123  */
4124  ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4125 
4126  /* re-establish fixed state */
4128 
4129  /*
4130  * Make sure there is no work left due to, say, circular dependencies, or
4131  * some other pathological condition. If so, do it in the single parent
4132  * connection. We don't sweat about RestorePass ordering; it's likely we
4133  * already violated that.
4134  */
4135  for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4136  {
4137  pg_log_info("processing missed item %d %s %s",
4138  te->dumpId, te->desc, te->tag);
4139  (void) restore_toc_entry(AH, te, false);
4140  }
4141 }
4142 
4143 /*
4144  * Check if te1 has an exclusive lock requirement for an item that te2 also
4145  * requires, whether or not te2's requirement is for an exclusive lock.
4146  */
4147 static bool
4149 {
4150  int j,
4151  k;
4152 
4153  for (j = 0; j < te1->nLockDeps; j++)
4154  {
4155  for (k = 0; k < te2->nDeps; k++)
4156  {
4157  if (te1->lockDeps[j] == te2->dependencies[k])
4158  return true;
4159  }
4160  }
4161  return false;
4162 }
4163 
4164 
4165 /*
4166  * Initialize the header of the pending-items list.
4167  *
4168  * This is a circular list with a dummy TocEntry as header, just like the
4169  * main TOC list; but we use separate list links so that an entry can be in
4170  * the main TOC list as well as in the pending list.
4171  */
4172 static void
4174 {
4175  l->pending_prev = l->pending_next = l;
4176 }
4177 
4178 /* Append te to the end of the pending-list headed by l */
4179 static void
4181 {
4182  te->pending_prev = l->pending_prev;
4183  l->pending_prev->pending_next = te;
4184  l->pending_prev = te;
4185  te->pending_next = l;
4186 }
4187 
4188 /* Remove te from the pending-list */
4189 static void
4191 {
4194  te->pending_prev = NULL;
4195  te->pending_next = NULL;
4196 }
4197 
4198 
4199 /*
4200  * Initialize the ready_list with enough room for up to tocCount entries.
4201  */
4202 static void
4203 ready_list_init(ParallelReadyList *ready_list, int tocCount)
4204 {
4205  ready_list->tes = (TocEntry **)
4206  pg_malloc(tocCount * sizeof(TocEntry *));
4207  ready_list->first_te = 0;
4208  ready_list->last_te = -1;
4209  ready_list->sorted = false;
4210 }
4211 
4212 /*
4213  * Free storage for a ready_list.
4214  */
4215 static void
4217 {
4218  pg_free(ready_list->tes);
4219 }
4220 
4221 /* Add te to the ready_list */
4222 static void
4224 {
4225  ready_list->tes[++ready_list->last_te] = te;
4226  /* List is (probably) not sorted anymore. */
4227  ready_list->sorted = false;
4228 }
4229 
4230 /* Remove the i'th entry in the ready_list */
4231 static void
4233 {
4234  int f = ready_list->first_te;
4235 
4236  Assert(i >= f && i <= ready_list->last_te);
4237 
4238  /*
4239  * In the typical case where the item to be removed is the first ready
4240  * entry, we need only increment first_te to remove it. Otherwise, move
4241  * the entries before it to compact the list. (This preserves sortedness,
4242  * if any.) We could alternatively move the entries after i, but there
4243  * are typically many more of those.
4244  */
4245  if (i > f)
4246  {
4247  TocEntry **first_te_ptr = &ready_list->tes[f];
4248 
4249  memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4250  }
4251  ready_list->first_te++;
4252 }
4253 
4254 /* Sort the ready_list into the desired order */
4255 static void
4257 {
4258  if (!ready_list->sorted)
4259  {
4260  int n = ready_list->last_te - ready_list->first_te + 1;
4261 
4262  if (n > 1)
4263  qsort(ready_list->tes + ready_list->first_te, n,
4264  sizeof(TocEntry *),
4266  ready_list->sorted = true;
4267  }
4268 }
4269 
4270 /* qsort comparator for sorting TocEntries by dataLength */
4271 static int
4272 TocEntrySizeCompare(const void *p1, const void *p2)
4273 {
4274  const TocEntry *te1 = *(const TocEntry *const *) p1;
4275  const TocEntry *te2 = *(const TocEntry *const *) p2;
4276 
4277  /* Sort by decreasing dataLength */
4278  if (te1->dataLength > te2->dataLength)
4279  return -1;
4280  if (te1->dataLength < te2->dataLength)
4281  return 1;
4282 
4283  /* For equal dataLengths, sort by dumpId, just to be stable */
4284  if (te1->dumpId < te2->dumpId)
4285  return -1;
4286  if (te1->dumpId > te2->dumpId)
4287  return 1;
4288 
4289  return 0;
4290 }
4291 
4292 
4293 /*
4294  * Move all immediately-ready items from pending_list to ready_list.
4295  *
4296  * Items are considered ready if they have no remaining dependencies and
4297  * they belong in the current restore pass. (See also reduce_dependencies,
4298  * which applies the same logic one-at-a-time.)
4299  */
4300 static void
4302  ParallelReadyList *ready_list,
4303  RestorePass pass)
4304 {
4305  TocEntry *te;
4306  TocEntry *next_te;
4307 
4308  for (te = pending_list->pending_next; te != pending_list; te = next_te)
4309  {
4310  /* must save list link before possibly removing te from list */
4311  next_te = te->pending_next;
4312 
4313  if (te->depCount == 0 &&
4314  _tocEntryRestorePass(te) == pass)
4315  {
4316  /* Remove it from pending_list ... */
4317  pending_list_remove(te);
4318  /* ... and add to ready_list */
4319  ready_list_insert(ready_list, te);
4320  }
4321  }
4322 }
4323 
4324 /*
4325  * Find the next work item (if any) that is capable of being run now,
4326  * and remove it from the ready_list.
4327  *
4328  * Returns the item, or NULL if nothing is runnable.
4329  *
4330  * To qualify, the item must have no remaining dependencies
4331  * and no requirements for locks that are incompatible with
4332  * items currently running. Items in the ready_list are known to have
4333  * no remaining dependencies, but we have to check for lock conflicts.
4334  */
4335 static TocEntry *
4337  ParallelState *pstate)
4338 {
4339  /*
4340  * Sort the ready_list so that we'll tackle larger jobs first.
4341  */
4342  ready_list_sort(ready_list);
4343 
4344  /*
4345  * Search the ready_list until we find a suitable item.
4346  */
4347  for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4348  {
4349  TocEntry *te = ready_list->tes[i];
4350  bool conflicts = false;
4351 
4352  /*
4353  * Check to see if the item would need exclusive lock on something
4354  * that a currently running item also needs lock on, or vice versa. If
4355  * so, we don't want to schedule them together.
4356  */
4357  for (int k = 0; k < pstate->numWorkers; k++)
4358  {
4359  TocEntry *running_te = pstate->te[k];
4360 
4361  if (running_te == NULL)
4362  continue;
4363  if (has_lock_conflicts(te, running_te) ||
4364  has_lock_conflicts(running_te, te))
4365  {
4366  conflicts = true;
4367  break;
4368  }
4369  }
4370 
4371  if (conflicts)
4372  continue;
4373 
4374  /* passed all tests, so this item can run */
4375  ready_list_remove(ready_list, i);
4376  return te;
4377  }
4378 
4379  pg_log_debug("no item ready");
4380  return NULL;
4381 }
4382 
4383 
4384 /*
4385  * Restore a single TOC item in parallel with others
4386  *
4387  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4388  * (everything else). A worker process executes several such work items during
4389  * a parallel backup or restore. Once we terminate here and report back that
4390  * our work is finished, the leader process will assign us a new work item.
4391  */
4392 int
4394 {
4395  int status;
4396 
4397  Assert(AH->connection != NULL);
4398 
4399  /* Count only errors associated with this TOC entry */
4400  AH->public.n_errors = 0;
4401 
4402  /* Restore the TOC item */
4403  status = restore_toc_entry(AH, te, true);
4404 
4405  return status;
4406 }
4407 
4408 
4409 /*
4410  * Callback function that's invoked in the leader process after a step has
4411  * been parallel restored.
4412  *
4413  * Update status and reduce the dependency count of any dependent items.
4414  */
4415 static void
4417  TocEntry *te,
4418  int status,
4419  void *callback_data)
4420 {
4421  ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4422 
4423  pg_log_info("finished item %d %s %s",
4424  te->dumpId, te->desc, te->tag);
4425 
4426  if (status == WORKER_CREATE_DONE)
4427  mark_create_done(AH, te);
4428  else if (status == WORKER_INHIBIT_DATA)
4429  {
4431  AH->public.n_errors++;
4432  }
4433  else if (status == WORKER_IGNORED_ERRORS)
4434  AH->public.n_errors++;
4435  else if (status != 0)
4436  fatal("worker process failed: exit code %d",
4437  status);
4438 
4439  reduce_dependencies(AH, te, ready_list);
4440 }
4441 
4442 
4443 /*
4444  * Process the dependency information into a form useful for parallel restore.
4445  *
4446  * This function takes care of fixing up some missing or badly designed
4447  * dependencies, and then prepares subsidiary data structures that will be
4448  * used in the main parallel-restore logic, including:
4449  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4450  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4451  * dependencies for each TOC entry.
4452  *
4453  * We also identify locking dependencies so that we can avoid trying to
4454  * schedule conflicting items at the same time.
4455  */
4456 static void
4458 {
4459  TocEntry *te;
4460  int i;
4461 
4462  /*
4463  * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4464  * items are marked as not being in any parallel-processing list.
4465  */
4466  for (te = AH->toc->next; te != AH->toc; te = te->next)
4467  {
4468  te->depCount = te->nDeps;
4469  te->revDeps = NULL;
4470  te->nRevDeps = 0;
4471  te->pending_prev = NULL;
4472  te->pending_next = NULL;
4473  }
4474 
4475  /*
4476  * POST_DATA items that are shown as depending on a table need to be
4477  * re-pointed to depend on that table's data, instead. This ensures they
4478  * won't get scheduled until the data has been loaded.
4479  */
4481 
4482  /*
4483  * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4484  * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4485  * one BLOB COMMENTS in such files.)
4486  */
4487  if (AH->version < K_VERS_1_11)
4488  {
4489  for (te = AH->toc->next; te != AH->toc; te = te->next)
4490  {
4491  if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4492  {
4493  TocEntry *te2;
4494 
4495  for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4496  {
4497  if (strcmp(te2->desc, "BLOBS") == 0)
4498  {
4499  te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4500  te->dependencies[0] = te2->dumpId;
4501  te->nDeps++;
4502  te->depCount++;
4503  break;
4504  }
4505  }
4506  break;
4507  }
4508  }
4509  }
4510 
4511  /*
4512  * At this point we start to build the revDeps reverse-dependency arrays,
4513  * so all changes of dependencies must be complete.
4514  */
4515 
4516  /*
4517  * Count the incoming dependencies for each item. Also, it is possible
4518  * that the dependencies list items that are not in the archive at all
4519  * (that should not happen in 9.2 and later, but is highly likely in older
4520  * archives). Subtract such items from the depCounts.
4521  */
4522  for (te = AH->toc->next; te != AH->toc; te = te->next)
4523  {
4524  for (i = 0; i < te->nDeps; i++)
4525  {
4526  DumpId depid = te->dependencies[i];
4527 
4528  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4529  AH->tocsByDumpId[depid]->nRevDeps++;
4530  else
4531  te->depCount--;
4532  }
4533  }
4534 
4535  /*
4536  * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4537  * it as a counter below.
4538  */
4539  for (te = AH->toc->next; te != AH->toc; te = te->next)
4540  {
4541  if (te->nRevDeps > 0)
4542  te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4543  te->nRevDeps = 0;
4544  }
4545 
4546  /*
4547  * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4548  * better agree with the loops above.
4549  */
4550  for (te = AH->toc->next; te != AH->toc; te = te->next)
4551  {
4552  for (i = 0; i < te->nDeps; i++)
4553  {
4554  DumpId depid = te->dependencies[i];
4555 
4556  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4557  {
4558  TocEntry *otherte = AH->tocsByDumpId[depid];
4559 
4560  otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4561  }
4562  }
4563  }
4564 
4565  /*
4566  * Lastly, work out the locking dependencies.
4567  */
4568  for (te = AH->toc->next; te != AH->toc; te = te->next)
4569  {
4570  te->lockDeps = NULL;
4571  te->nLockDeps = 0;
4573  }
4574 }
4575 
4576 /*
4577  * Change dependencies on table items to depend on table data items instead,
4578  * but only in POST_DATA items.
4579  *
4580  * Also, for any item having such dependency(s), set its dataLength to the
4581  * largest dataLength of the table data items it depends on. This ensures
4582  * that parallel restore will prioritize larger jobs (index builds, FK
4583  * constraint checks, etc) over smaller ones, avoiding situations where we
4584  * end a restore with only one active job working on a large table.
4585  */
4586 static void
4588 {
4589  TocEntry *te;
4590  int i;
4591  DumpId olddep;
4592 
4593  for (te = AH->toc->next; te != AH->toc; te = te->next)
4594  {
4595  if (te->section != SECTION_POST_DATA)
4596  continue;
4597  for (i = 0; i < te->nDeps; i++)
4598  {
4599  olddep = te->dependencies[i];
4600  if (olddep <= AH->maxDumpId &&
4601  AH->tableDataId[olddep] != 0)
4602  {
4603  DumpId tabledataid = AH->tableDataId[olddep];
4604  TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4605 
4606  te->dependencies[i] = tabledataid;
4607  te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4608  pg_log_debug("transferring dependency %d -> %d to %d",
4609  te->dumpId, olddep, tabledataid);
4610  }
4611  }
4612  }
4613 }
4614 
4615 /*
4616  * Identify which objects we'll need exclusive lock on in order to restore
4617  * the given TOC entry (*other* than the one identified by the TOC entry
4618  * itself). Record their dump IDs in the entry's lockDeps[] array.
4619  */
4620 static void
4622 {
4623  DumpId *lockids;
4624  int nlockids;
4625  int i;
4626 
4627  /*
4628  * We only care about this for POST_DATA items. PRE_DATA items are not
4629  * run in parallel, and DATA items are all independent by assumption.
4630  */
4631  if (te->section != SECTION_POST_DATA)
4632  return;
4633 
4634  /* Quick exit if no dependencies at all */
4635  if (te->nDeps == 0)
4636  return;
4637 
4638  /*
4639  * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4640  * and hence require exclusive lock. However, we know that CREATE INDEX
4641  * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4642  * category too ... but today is not that day.)
4643  */
4644  if (strcmp(te->desc, "INDEX") == 0)
4645  return;
4646 
4647  /*
4648  * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4649  * item listed among its dependencies. Originally all of these would have
4650  * been TABLE items, but repoint_table_dependencies would have repointed
4651  * them to the TABLE DATA items if those are present (which they might not
4652  * be, eg in a schema-only dump). Note that all of the entries we are
4653  * processing here are POST_DATA; otherwise there might be a significant
4654  * difference between a dependency on a table and a dependency on its
4655  * data, so that closer analysis would be needed here.
4656  */
4657  lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4658  nlockids = 0;
4659  for (i = 0; i < te->nDeps; i++)
4660  {
4661  DumpId depid = te->dependencies[i];
4662 
4663  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4664  ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4665  strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4666  lockids[nlockids++] = depid;
4667  }
4668 
4669  if (nlockids == 0)
4670  {
4671  free(lockids);
4672  return;
4673  }
4674 
4675  te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4676  te->nLockDeps = nlockids;
4677 }
4678 
4679 /*
4680  * Remove the specified TOC entry from the depCounts of items that depend on
4681  * it, thereby possibly making them ready-to-run. Any pending item that
4682  * becomes ready should be moved to the ready_list, if that's provided.
4683  */
4684 static void
4686  ParallelReadyList *ready_list)
4687 {
4688  int i;
4689 
4690  pg_log_debug("reducing dependencies for %d", te->dumpId);
4691 
4692  for (i = 0; i < te->nRevDeps; i++)
4693  {
4694  TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4695 
4696  Assert(otherte->depCount > 0);
4697  otherte->depCount--;
4698 
4699  /*
4700  * It's ready if it has no remaining dependencies, and it belongs in
4701  * the current restore pass, and it is currently a member of the
4702  * pending list (that check is needed to prevent double restore in
4703  * some cases where a list-file forces out-of-order restoring).
4704  * However, if ready_list == NULL then caller doesn't want any list
4705  * memberships changed.
4706  */
4707  if (otherte->depCount == 0 &&
4708  _tocEntryRestorePass(otherte) == AH->restorePass &&
4709  otherte->pending_prev != NULL &&
4710  ready_list != NULL)
4711  {
4712  /* Remove it from pending list ... */
4713  pending_list_remove(otherte);
4714  /* ... and add to ready_list */
4715  ready_list_insert(ready_list, otherte);
4716  }
4717  }
4718 }
4719 
4720 /*
4721  * Set the created flag on the DATA member corresponding to the given
4722  * TABLE member
4723  */
4724 static void
4726 {
4727  if (AH->tableDataId[te->dumpId] != 0)
4728  {
4729  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4730 
4731  ted->created = true;
4732  }
4733 }
4734 
4735 /*
4736  * Mark the DATA member corresponding to the given TABLE member
4737  * as not wanted
4738  */
4739 static void
4741 {
4742  pg_log_info("table \"%s\" could not be created, will not restore its data",
4743  te->tag);
4744 
4745  if (AH->tableDataId[te->dumpId] != 0)
4746  {
4747  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4748 
4749  ted->reqs = 0;
4750  }
4751 }
4752 
4753 /*
4754  * Clone and de-clone routines used in parallel restoration.
4755  *
4756  * Enough of the structure is cloned to ensure that there is no
4757  * conflict between different threads each with their own clone.
4758  */
4759 ArchiveHandle *
4761 {
4762  ArchiveHandle *clone;
4763 
4764  /* Make a "flat" copy */
4765  clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4766  memcpy(clone, AH, sizeof(ArchiveHandle));
4767 
4768  /* Handle format-independent fields */
4769  memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4770 
4771  /* The clone will have its own connection, so disregard connection state */
4772  clone->connection = NULL;
4773  clone->connCancel = NULL;
4774  clone->currUser = NULL;
4775  clone->currSchema = NULL;
4776  clone->currTablespace = NULL;
4777 
4778  /* savedPassword must be local in case we change it while connecting */
4779  if (clone->savedPassword)
4780  clone->savedPassword = pg_strdup(clone->savedPassword);
4781 
4782  /* clone has its own error count, too */
4783  clone->public.n_errors = 0;
4784 
4785  /*
4786  * Connect our new clone object to the database, using the same connection
4787  * parameters used for the original connection.
4788  */
4789  ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4790 
4791  /* re-establish fixed state */
4792  if (AH->mode == archModeRead)
4793  _doSetFixedOutputState(clone);
4794  /* in write case, setupDumpWorker will fix up connection state */
4795 
4796  /* Let the format-specific code have a chance too */
4797  clone->ClonePtr(clone);
4798 
4799  Assert(clone->connection != NULL);
4800  return clone;
4801 }
4802 
4803 /*
4804  * Release clone-local storage.
4805  *
4806  * Note: we assume any clone-local connection was already closed.
4807  */
4808 void
4810 {
4811  /* Should not have an open database connection */
4812  Assert(AH->connection == NULL);
4813 
4814  /* Clear format-specific state */
4815  AH->DeClonePtr(AH);
4816 
4817  /* Clear state allocated by CloneArchive */
4818  if (AH->sqlparse.curCmd)
4820 
4821  /* Clear any connection-local state */
4822  if (AH->currUser)
4823  free(AH->currUser);
4824  if (AH->currSchema)
4825  free(AH->currSchema);
4826  if (AH->currTablespace)
4827  free(AH->currTablespace);
4828  if (AH->currTableAm)
4829  free(AH->currTableAm);
4830  if (AH->savedPassword)
4831  free(AH->savedPassword);
4832 
4833  free(AH);
4834 }
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:177
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1064
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1456
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1210
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1273
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:902
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
#define PG_BINARY_R
Definition: c.h:1270
#define ngettext(s, p, n)
Definition: c.h:1179
#define PG_BINARY_A
Definition: c.h:1269
#define Max(x, y)
Definition: c.h:980
#define PG_BINARY_W
Definition: c.h:1271
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int pg_char_to_encoding(const char *name)
Definition: encnames.c:550
const char * pg_encoding_to_char(int encoding)
Definition: encnames.c:588
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6760
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6616
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6770
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void PQclear(PGresult *res)
Definition: fe-exec.c:694
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:480
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:667
char sign
Definition: informix.c:668
int b
Definition: isn.c:70
return true
Definition: isn.c:126
int j
Definition: isn.c:74
int i
Definition: isn.c:73
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:95
#define INV_WRITE
Definition: libpq-fs.h:21
va_end(args)
Assert(fmt[strlen(fmt) - 1] !='\n')
static void const char * fmt
va_start(args, fmt)
static struct pg_tm tm
Definition: localtime.c:102
void pg_log_generic(enum pg_log_level level, const char *pg_restrict fmt,...)
Definition: logging.c:197
void pg_log_generic_v(enum pg_log_level level, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:207
#define pg_log_info(...)
Definition: logging.h:88
@ PG_LOG_INFO
Definition: logging.h:33
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:92
static AmcheckOptions opts
Definition: pg_amcheck.c:110
void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:114
@ SECTION_NONE
Definition: pg_backup.h:55
@ SECTION_POST_DATA
Definition: pg_backup.h:58
@ SECTION_PRE_DATA
Definition: pg_backup.h:56
@ SECTION_DATA
Definition: pg_backup.h:57
int DumpId
Definition: pg_backup.h:244
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:254
enum _archiveFormat ArchiveFormat
@ archModeWrite
Definition: pg_backup.h:49
@ archModeAppend
Definition: pg_backup.h:48
@ archModeRead
Definition: pg_backup.h:50
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:230
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:39
@ archTar
Definition: pg_backup.h:41
@ archCustom
Definition: pg_backup.h:40
@ archDirectory
Definition: pg_backup.h:43
@ archNull
Definition: pg_backup.h:42
static void fix_dependencies(ArchiveHandle *AH)
static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
static void _becomeUser(ArchiveHandle *AH, const char *user)
static void ready_list_free(ParallelReadyList *ready_list)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void pending_list_append(TocEntry *l, TocEntry *te)
static void ready_list_sort(ParallelReadyList *ready_list)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
static bool _tocEntryIsACL(TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
struct _outputContext OutputContext
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void ready_list_remove(ParallelReadyList *ready_list, int i)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
static OutputContext SaveOutput(ArchiveHandle *AH)
RestoreOptions * NewRestoreOptions(void)
static RestorePass _tocEntryRestorePass(TocEntry *te)
static void setupRestoreWorker(Archive *AHX)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const int compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, ParallelReadyList *ready_list)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
static void SetOutput(ArchiveHandle *AH, const char *filename, int compression)
void SortTocFromFile(Archive *AHX)
int StartBlob(Archive *AHX, Oid oid)
static void _getObjectDescription(PQExpBuffer buf, TocEntry *te)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
char * ReadStr(ArchiveHandle *AH)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
void StartRestoreBlobs(ArchiveHandle *AH)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
DumpOptions * NewDumpOptions(void)
void ReadToc(ArchiveHandle *AH)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const int compression, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker)
static char * sanitize_line(const char *str, bool want_hyphen)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
static int TocEntrySizeCompare(const void *p1, const void *p2)
void RestoreArchive(Archive *AHX)
static void ready_list_init(ParallelReadyList *ready_list, int tocCount)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static void move_to_ready_list(TocEntry *pending_list, ParallelReadyList *ready_list, RestorePass pass)
struct _parallelReadyList ParallelReadyList
void EndRestoreBlob(ArchiveHandle *AH, Oid oid)
static TocEntry * pop_next_work_item(ParallelReadyList *ready_list, ParallelState *pstate)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void EndRestoreBlobs(ArchiveHandle *AH)
void InitDumpOptions(DumpOptions *opts)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static void dump_lo_buf(ArchiveHandle *AH)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
int EndBlob(Archive *AHX, Oid oid)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
void DropBlobIfExists(ArchiveHandle *AH, Oid oid)
Definition: pg_backup_db.c:549
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define GZWRITE(p, s, n, fh)
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
Definition: pg_backup_db.c:78
#define ARCHIVE_REV(version)
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
@ ACT_DUMP
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
#define GZCLOSE(fh)
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define Z_DEFAULT_COMPRESSION
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
Definition: pg_backup_db.c:504
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
Definition: pg_backup_db.c:449
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
Definition: pg_checksums.c:65
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition: pg_ctl.c:84
int32 encoding
Definition: pg_database.h:41
static bool dosync
Definition: pg_dump.c:94
static void setupDumpWorker(Archive *AHX)
Definition: pg_dump.c:1255
#define exit_nicely(code)
Definition: pg_dumpall.c:97
static char * filename
Definition: pg_dumpall.c:92
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition: pg_get_line.c:95
static char * user
Definition: pg_regress.c:95
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
static char * buf
Definition: pg_test_fsync.c:69
char * tablespace
Definition: pgbench.c:226
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:224
#define snprintf
Definition: port.h:222
#define qsort(a, b, c, d)
Definition: port.h:510
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:74
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:116
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:380
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:369
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
char * c
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition: psprintf.c:106
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
const char * simple_string_list_not_touched(SimpleStringList *list)
Definition: simple_list.c:144
char * dbname
Definition: streamutil.c:51
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
Definition: string_utils.c:590
const char * fmtId(const char *rawid)
Definition: string_utils.c:64
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
int minRemoteVersion
Definition: pg_backup.h:199
char * remoteVersionStr
Definition: pg_backup.h:195
DumpOptions * dopt
Definition: pg_backup.h:191
bool exit_on_error
Definition: pg_backup.h:214
char * searchpath
Definition: pg_backup.h:210
int maxRemoteVersion
Definition: pg_backup.h:200
int n_errors
Definition: pg_backup.h:215
bool std_strings
Definition: pg_backup.h:207
int numWorkers
Definition: pg_backup.h:202
int encoding
Definition: pg_backup.h:206
int verbose
Definition: pg_backup.h:194
RestoreOptions * ropt
Definition: pg_backup.h:192
Oid tableoid
Definition: pg_backup.h:240
TocEntry ** te
Definition: parallel.h:59
int numWorkers
Definition: parallel.h:57
SimpleStringListCell * head
Definition: simple_list.h:42