PostgreSQL Source Code  git master
pg_backup_archiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  * Private implementation of the archiver routines.
6  *
7  * See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  * Rights are granted to use this software in any way so long
11  * as this notice is not removed.
12  *
13  * The author is not responsible for loss or damages that may
14  * result from its use.
15  *
16  *
17  * IDENTIFICATION
18  * src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "common/string.h"
34 #include "compress_io.h"
35 #include "dumputils.h"
36 #include "fe_utils/string_utils.h"
37 #include "lib/binaryheap.h"
38 #include "lib/stringinfo.h"
39 #include "libpq/libpq-fs.h"
40 #include "parallel.h"
41 #include "pg_backup_archiver.h"
42 #include "pg_backup_db.h"
43 #include "pg_backup_utils.h"
44 
45 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
46 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
47 
48 
49 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
50  const pg_compress_specification compression_spec,
51  bool dosync, ArchiveMode mode,
52  SetupWorkerPtrType setupWorkerPtr,
54 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
55 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
56 static char *sanitize_line(const char *str, bool want_hyphen);
57 static void _doSetFixedOutputState(ArchiveHandle *AH);
58 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
59 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
60 static void _becomeUser(ArchiveHandle *AH, const char *user);
61 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
62 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
63 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
64 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
65 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
66 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
67 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
68 static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
70 static bool _tocEntryIsACL(TocEntry *te);
73 static bool is_load_via_partition_root(TocEntry *te);
74 static void buildTocEntryArrays(ArchiveHandle *AH);
75 static void _moveBefore(TocEntry *pos, TocEntry *te);
77 
78 static int RestoringToDB(ArchiveHandle *AH);
79 static void dump_lo_buf(ArchiveHandle *AH);
80 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
81 static void SetOutput(ArchiveHandle *AH, const char *filename,
82  const pg_compress_specification compression_spec);
84 static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
85 
86 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
88  TocEntry *pending_list);
90  ParallelState *pstate,
91  TocEntry *pending_list);
93  TocEntry *pending_list);
94 static void pending_list_header_init(TocEntry *l);
95 static void pending_list_append(TocEntry *l, TocEntry *te);
96 static void pending_list_remove(TocEntry *te);
97 static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
98 static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
99 static void move_to_ready_heap(TocEntry *pending_list,
100  binaryheap *ready_heap,
101  RestorePass pass);
102 static TocEntry *pop_next_work_item(binaryheap *ready_heap,
103  ParallelState *pstate);
104 static void mark_dump_job_done(ArchiveHandle *AH,
105  TocEntry *te,
106  int status,
107  void *callback_data);
108 static void mark_restore_job_done(ArchiveHandle *AH,
109  TocEntry *te,
110  int status,
111  void *callback_data);
112 static void fix_dependencies(ArchiveHandle *AH);
113 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
116 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
117  binaryheap *ready_heap);
118 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
120 
121 static void StrictNamesCheck(RestoreOptions *ropt);
122 
123 
124 /*
125  * Allocate a new DumpOptions block containing all default values.
126  */
127 DumpOptions *
129 {
131 
133  return opts;
134 }
135 
136 /*
137  * Initialize a DumpOptions struct to all default values
138  */
139 void
141 {
142  memset(opts, 0, sizeof(DumpOptions));
143  /* set any fields that shouldn't default to zeroes */
144  opts->include_everything = true;
145  opts->cparams.promptPassword = TRI_DEFAULT;
146  opts->dumpSections = DUMP_UNSECTIONED;
147 }
148 
149 /*
150  * Create a freshly allocated DumpOptions with options equivalent to those
151  * found in the given RestoreOptions.
152  */
153 DumpOptions *
155 {
156  DumpOptions *dopt = NewDumpOptions();
157 
158  /* this is the inverse of what's at the end of pg_dump.c's main() */
159  dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
160  dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
161  dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
162  dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
164  dopt->outputClean = ropt->dropSchema;
165  dopt->dataOnly = ropt->dataOnly;
166  dopt->schemaOnly = ropt->schemaOnly;
167  dopt->if_exists = ropt->if_exists;
168  dopt->column_inserts = ropt->column_inserts;
169  dopt->dumpSections = ropt->dumpSections;
170  dopt->aclsSkip = ropt->aclsSkip;
171  dopt->outputSuperuser = ropt->superuser;
172  dopt->outputCreateDB = ropt->createDB;
173  dopt->outputNoOwner = ropt->noOwner;
174  dopt->outputNoTableAm = ropt->noTableAm;
175  dopt->outputNoTablespaces = ropt->noTablespace;
176  dopt->disable_triggers = ropt->disable_triggers;
177  dopt->use_setsessauth = ropt->use_setsessauth;
179  dopt->dump_inserts = ropt->dump_inserts;
180  dopt->no_comments = ropt->no_comments;
181  dopt->no_publications = ropt->no_publications;
183  dopt->no_subscriptions = ropt->no_subscriptions;
184  dopt->lockWaitTimeout = ropt->lockWaitTimeout;
187  dopt->sequence_data = ropt->sequence_data;
188 
189  return dopt;
190 }
191 
192 
193 /*
194  * Wrapper functions.
195  *
196  * The objective is to make writing new formats and dumpers as simple
197  * as possible, if necessary at the expense of extra function calls etc.
198  *
199  */
200 
201 /*
202  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
203  * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
204  * setup doesn't need to know anything much, so it's defined here.
205  */
206 static void
208 {
209  ArchiveHandle *AH = (ArchiveHandle *) AHX;
210 
211  AH->ReopenPtr(AH);
212 }
213 
214 
215 /* Create a new archive */
216 /* Public */
217 Archive *
218 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
219  const pg_compress_specification compression_spec,
220  bool dosync, ArchiveMode mode,
223 
224 {
225  ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
227 
228  return (Archive *) AH;
229 }
230 
231 /* Open an existing archive */
232 /* Public */
233 Archive *
234 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
235 {
236  ArchiveHandle *AH;
237  pg_compress_specification compression_spec = {0};
238 
239  compression_spec.algorithm = PG_COMPRESSION_NONE;
240  AH = _allocAH(FileSpec, fmt, compression_spec, true,
243 
244  return (Archive *) AH;
245 }
246 
247 /* Public */
248 void
250 {
251  ArchiveHandle *AH = (ArchiveHandle *) AHX;
252 
253  AH->ClosePtr(AH);
254 
255  /* Close the output */
256  errno = 0;
257  if (!EndCompressFileHandle(AH->OF))
258  pg_fatal("could not close output file: %m");
259 }
260 
261 /* Public */
262 void
264 {
265  /* Caller can omit dump options, in which case we synthesize them */
266  if (dopt == NULL && ropt != NULL)
267  dopt = dumpOptionsFromRestoreOptions(ropt);
268 
269  /* Save options for later access */
270  AH->dopt = dopt;
271  AH->ropt = ropt;
272 }
273 
274 /* Public */
275 void
277 {
278  ArchiveHandle *AH = (ArchiveHandle *) AHX;
279  RestoreOptions *ropt = AH->public.ropt;
280  TocEntry *te;
281  teSection curSection;
282 
283  /* Decide which TOC entries will be dumped/restored, and mark them */
284  curSection = SECTION_PRE_DATA;
285  for (te = AH->toc->next; te != AH->toc; te = te->next)
286  {
287  /*
288  * When writing an archive, we also take this opportunity to check
289  * that we have generated the entries in a sane order that respects
290  * the section divisions. When reading, don't complain, since buggy
291  * old versions of pg_dump might generate out-of-order archives.
292  */
293  if (AH->mode != archModeRead)
294  {
295  switch (te->section)
296  {
297  case SECTION_NONE:
298  /* ok to be anywhere */
299  break;
300  case SECTION_PRE_DATA:
301  if (curSection != SECTION_PRE_DATA)
302  pg_log_warning("archive items not in correct section order");
303  break;
304  case SECTION_DATA:
305  if (curSection == SECTION_POST_DATA)
306  pg_log_warning("archive items not in correct section order");
307  break;
308  case SECTION_POST_DATA:
309  /* ok no matter which section we were in */
310  break;
311  default:
312  pg_fatal("unexpected section code %d",
313  (int) te->section);
314  break;
315  }
316  }
317 
318  if (te->section != SECTION_NONE)
319  curSection = te->section;
320 
321  te->reqs = _tocEntryRequired(te, curSection, AH);
322  }
323 
324  /* Enforce strict names checking */
325  if (ropt->strict_names)
326  StrictNamesCheck(ropt);
327 }
328 
329 /* Public */
330 void
332 {
333  ArchiveHandle *AH = (ArchiveHandle *) AHX;
334  RestoreOptions *ropt = AH->public.ropt;
335  bool parallel_mode;
336  TocEntry *te;
337  CompressFileHandle *sav;
338 
340 
341  /*
342  * If we're going to do parallel restore, there are some restrictions.
343  */
344  parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
345  if (parallel_mode)
346  {
347  /* We haven't got round to making this work for all archive formats */
348  if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
349  pg_fatal("parallel restore is not supported with this archive file format");
350 
351  /* Doesn't work if the archive represents dependencies as OIDs */
352  if (AH->version < K_VERS_1_8)
353  pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
354 
355  /*
356  * It's also not gonna work if we can't reopen the input file, so
357  * let's try that immediately.
358  */
359  AH->ReopenPtr(AH);
360  }
361 
362  /*
363  * Make sure we won't need (de)compression we haven't got
364  */
365  if (AH->PrintTocDataPtr != NULL)
366  {
367  for (te = AH->toc->next; te != AH->toc; te = te->next)
368  {
369  if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
370  {
372 
373  if (errmsg)
374  pg_fatal("cannot restore from compressed archive (%s)",
375  errmsg);
376  else
377  break;
378  }
379  }
380  }
381 
382  /*
383  * Prepare index arrays, so we can assume we have them throughout restore.
384  * It's possible we already did this, though.
385  */
386  if (AH->tocsByDumpId == NULL)
388 
389  /*
390  * If we're using a DB connection, then connect it.
391  */
392  if (ropt->useDB)
393  {
394  pg_log_info("connecting to database for restore");
395  if (AH->version < K_VERS_1_3)
396  pg_fatal("direct database connections are not supported in pre-1.3 archives");
397 
398  /*
399  * We don't want to guess at whether the dump will successfully
400  * restore; allow the attempt regardless of the version of the restore
401  * target.
402  */
403  AHX->minRemoteVersion = 0;
404  AHX->maxRemoteVersion = 9999999;
405 
406  ConnectDatabase(AHX, &ropt->cparams, false);
407 
408  /*
409  * If we're talking to the DB directly, don't send comments since they
410  * obscure SQL when displaying errors
411  */
412  AH->noTocComments = 1;
413  }
414 
415  /*
416  * Work out if we have an implied data-only restore. This can happen if
417  * the dump was data only or if the user has used a toc list to exclude
418  * all of the schema data. All we do is look for schema entries - if none
419  * are found then we set the dataOnly flag.
420  *
421  * We could scan for wanted TABLE entries, but that is not the same as
422  * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
423  */
424  if (!ropt->dataOnly)
425  {
426  int impliedDataOnly = 1;
427 
428  for (te = AH->toc->next; te != AH->toc; te = te->next)
429  {
430  if ((te->reqs & REQ_SCHEMA) != 0)
431  { /* It's schema, and it's wanted */
432  impliedDataOnly = 0;
433  break;
434  }
435  }
436  if (impliedDataOnly)
437  {
438  ropt->dataOnly = impliedDataOnly;
439  pg_log_info("implied data-only restore");
440  }
441  }
442 
443  /*
444  * Setup the output file if necessary.
445  */
446  sav = SaveOutput(AH);
448  SetOutput(AH, ropt->filename, ropt->compression_spec);
449 
450  ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
451 
452  if (AH->archiveRemoteVersion)
453  ahprintf(AH, "-- Dumped from database version %s\n",
455  if (AH->archiveDumpVersion)
456  ahprintf(AH, "-- Dumped by pg_dump version %s\n",
457  AH->archiveDumpVersion);
458 
459  ahprintf(AH, "\n");
460 
461  if (AH->public.verbose)
462  dumpTimestamp(AH, "Started on", AH->createDate);
463 
464  if (ropt->single_txn)
465  {
466  if (AH->connection)
467  StartTransaction(AHX);
468  else
469  ahprintf(AH, "BEGIN;\n\n");
470  }
471 
472  /*
473  * Establish important parameter values right away.
474  */
476 
477  AH->stage = STAGE_PROCESSING;
478 
479  /*
480  * Drop the items at the start, in reverse order
481  */
482  if (ropt->dropSchema)
483  {
484  for (te = AH->toc->prev; te != AH->toc; te = te->prev)
485  {
486  AH->currentTE = te;
487 
488  /*
489  * In createDB mode, issue a DROP *only* for the database as a
490  * whole. Issuing drops against anything else would be wrong,
491  * because at this point we're connected to the wrong database.
492  * (The DATABASE PROPERTIES entry, if any, should be treated like
493  * the DATABASE entry.)
494  */
495  if (ropt->createDB)
496  {
497  if (strcmp(te->desc, "DATABASE") != 0 &&
498  strcmp(te->desc, "DATABASE PROPERTIES") != 0)
499  continue;
500  }
501 
502  /* Otherwise, drop anything that's selected and has a dropStmt */
503  if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
504  {
505  pg_log_info("dropping %s %s", te->desc, te->tag);
506  /* Select owner and schema as necessary */
507  _becomeOwner(AH, te);
508  _selectOutputSchema(AH, te->namespace);
509 
510  /*
511  * Now emit the DROP command, if the object has one. Note we
512  * don't necessarily emit it verbatim; at this point we add an
513  * appropriate IF EXISTS clause, if the user requested it.
514  */
515  if (*te->dropStmt != '\0')
516  {
517  if (!ropt->if_exists ||
518  strncmp(te->dropStmt, "--", 2) == 0)
519  {
520  /*
521  * Without --if-exists, or if it's just a comment (as
522  * happens for the public schema), print the dropStmt
523  * as-is.
524  */
525  ahprintf(AH, "%s", te->dropStmt);
526  }
527  else
528  {
529  /*
530  * Inject an appropriate spelling of "if exists". For
531  * large objects, we have a separate routine that
532  * knows how to do it, without depending on
533  * te->dropStmt; use that. For other objects we need
534  * to parse the command.
535  */
536  if (strncmp(te->desc, "BLOB", 4) == 0)
537  {
538  DropLOIfExists(AH, te->catalogId.oid);
539  }
540  else
541  {
542  char *dropStmt = pg_strdup(te->dropStmt);
543  char *dropStmtOrig = dropStmt;
544  PQExpBuffer ftStmt = createPQExpBuffer();
545 
546  /*
547  * Need to inject IF EXISTS clause after ALTER
548  * TABLE part in ALTER TABLE .. DROP statement
549  */
550  if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
551  {
552  appendPQExpBufferStr(ftStmt,
553  "ALTER TABLE IF EXISTS");
554  dropStmt = dropStmt + 11;
555  }
556 
557  /*
558  * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
559  * not support the IF EXISTS clause, and therefore
560  * we simply emit the original command for DEFAULT
561  * objects (modulo the adjustment made above).
562  *
563  * Likewise, don't mess with DATABASE PROPERTIES.
564  *
565  * If we used CREATE OR REPLACE VIEW as a means of
566  * quasi-dropping an ON SELECT rule, that should
567  * be emitted unchanged as well.
568  *
569  * For other object types, we need to extract the
570  * first part of the DROP which includes the
571  * object type. Most of the time this matches
572  * te->desc, so search for that; however for the
573  * different kinds of CONSTRAINTs, we know to
574  * search for hardcoded "DROP CONSTRAINT" instead.
575  */
576  if (strcmp(te->desc, "DEFAULT") == 0 ||
577  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
578  strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
579  appendPQExpBufferStr(ftStmt, dropStmt);
580  else
581  {
582  char buffer[40];
583  char *mark;
584 
585  if (strcmp(te->desc, "CONSTRAINT") == 0 ||
586  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
587  strcmp(te->desc, "FK CONSTRAINT") == 0)
588  strcpy(buffer, "DROP CONSTRAINT");
589  else
590  snprintf(buffer, sizeof(buffer), "DROP %s",
591  te->desc);
592 
593  mark = strstr(dropStmt, buffer);
594 
595  if (mark)
596  {
597  *mark = '\0';
598  appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
599  dropStmt, buffer,
600  mark + strlen(buffer));
601  }
602  else
603  {
604  /* complain and emit unmodified command */
605  pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
606  dropStmtOrig);
607  appendPQExpBufferStr(ftStmt, dropStmt);
608  }
609  }
610 
611  ahprintf(AH, "%s", ftStmt->data);
612 
613  destroyPQExpBuffer(ftStmt);
614  pg_free(dropStmtOrig);
615  }
616  }
617  }
618  }
619  }
620 
621  /*
622  * _selectOutputSchema may have set currSchema to reflect the effect
623  * of a "SET search_path" command it emitted. However, by now we may
624  * have dropped that schema; or it might not have existed in the first
625  * place. In either case the effective value of search_path will not
626  * be what we think. Forcibly reset currSchema so that we will
627  * re-establish the search_path setting when needed (after creating
628  * the schema).
629  *
630  * If we treated users as pg_dump'able objects then we'd need to reset
631  * currUser here too.
632  */
633  free(AH->currSchema);
634  AH->currSchema = NULL;
635  }
636 
637  if (parallel_mode)
638  {
639  /*
640  * In parallel mode, turn control over to the parallel-restore logic.
641  */
642  ParallelState *pstate;
643  TocEntry pending_list;
644 
645  /* The archive format module may need some setup for this */
646  if (AH->PrepParallelRestorePtr)
647  AH->PrepParallelRestorePtr(AH);
648 
649  pending_list_header_init(&pending_list);
650 
651  /* This runs PRE_DATA items and then disconnects from the database */
652  restore_toc_entries_prefork(AH, &pending_list);
653  Assert(AH->connection == NULL);
654 
655  /* ParallelBackupStart() will actually fork the processes */
656  pstate = ParallelBackupStart(AH);
657  restore_toc_entries_parallel(AH, pstate, &pending_list);
658  ParallelBackupEnd(AH, pstate);
659 
660  /* reconnect the leader and see if we missed something */
661  restore_toc_entries_postfork(AH, &pending_list);
662  Assert(AH->connection != NULL);
663  }
664  else
665  {
666  /*
667  * In serial mode, process everything in three phases: normal items,
668  * then ACLs, then post-ACL items. We might be able to skip one or
669  * both extra phases in some cases, eg data-only restores.
670  */
671  bool haveACL = false;
672  bool havePostACL = false;
673 
674  for (te = AH->toc->next; te != AH->toc; te = te->next)
675  {
676  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
677  continue; /* ignore if not to be dumped at all */
678 
679  switch (_tocEntryRestorePass(te))
680  {
681  case RESTORE_PASS_MAIN:
682  (void) restore_toc_entry(AH, te, false);
683  break;
684  case RESTORE_PASS_ACL:
685  haveACL = true;
686  break;
688  havePostACL = true;
689  break;
690  }
691  }
692 
693  if (haveACL)
694  {
695  for (te = AH->toc->next; te != AH->toc; te = te->next)
696  {
697  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
699  (void) restore_toc_entry(AH, te, false);
700  }
701  }
702 
703  if (havePostACL)
704  {
705  for (te = AH->toc->next; te != AH->toc; te = te->next)
706  {
707  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
709  (void) restore_toc_entry(AH, te, false);
710  }
711  }
712  }
713 
714  if (ropt->single_txn)
715  {
716  if (AH->connection)
717  CommitTransaction(AHX);
718  else
719  ahprintf(AH, "COMMIT;\n\n");
720  }
721 
722  if (AH->public.verbose)
723  dumpTimestamp(AH, "Completed on", time(NULL));
724 
725  ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
726 
727  /*
728  * Clean up & we're done.
729  */
730  AH->stage = STAGE_FINALIZING;
731 
733  RestoreOutput(AH, sav);
734 
735  if (ropt->useDB)
737 }
738 
739 /*
740  * Restore a single TOC item. Used in both parallel and non-parallel restore;
741  * is_parallel is true if we are in a worker child process.
742  *
743  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
744  * the parallel parent has to make the corresponding status update.
745  */
746 static int
747 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
748 {
749  RestoreOptions *ropt = AH->public.ropt;
750  int status = WORKER_OK;
751  int reqs;
752  bool defnDumped;
753 
754  AH->currentTE = te;
755 
756  /* Dump any relevant dump warnings to stderr */
757  if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
758  {
759  if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
760  pg_log_warning("warning from original dump file: %s", te->defn);
761  else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
762  pg_log_warning("warning from original dump file: %s", te->copyStmt);
763  }
764 
765  /* Work out what, if anything, we want from this entry */
766  reqs = te->reqs;
767 
768  defnDumped = false;
769 
770  /*
771  * If it has a schema component that we want, then process that
772  */
773  if ((reqs & REQ_SCHEMA) != 0)
774  {
775  /* Show namespace in log message if available */
776  if (te->namespace)
777  pg_log_info("creating %s \"%s.%s\"",
778  te->desc, te->namespace, te->tag);
779  else
780  pg_log_info("creating %s \"%s\"",
781  te->desc, te->tag);
782 
783  _printTocEntry(AH, te, false);
784  defnDumped = true;
785 
786  if (strcmp(te->desc, "TABLE") == 0)
787  {
788  if (AH->lastErrorTE == te)
789  {
790  /*
791  * We failed to create the table. If
792  * --no-data-for-failed-tables was given, mark the
793  * corresponding TABLE DATA to be ignored.
794  *
795  * In the parallel case this must be done in the parent, so we
796  * just set the return value.
797  */
798  if (ropt->noDataForFailedTables)
799  {
800  if (is_parallel)
801  status = WORKER_INHIBIT_DATA;
802  else
804  }
805  }
806  else
807  {
808  /*
809  * We created the table successfully. Mark the corresponding
810  * TABLE DATA for possible truncation.
811  *
812  * In the parallel case this must be done in the parent, so we
813  * just set the return value.
814  */
815  if (is_parallel)
816  status = WORKER_CREATE_DONE;
817  else
818  mark_create_done(AH, te);
819  }
820  }
821 
822  /*
823  * If we created a DB, connect to it. Also, if we changed DB
824  * properties, reconnect to ensure that relevant GUC settings are
825  * applied to our session.
826  */
827  if (strcmp(te->desc, "DATABASE") == 0 ||
828  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
829  {
830  pg_log_info("connecting to new database \"%s\"", te->tag);
831  _reconnectToDB(AH, te->tag);
832  }
833  }
834 
835  /*
836  * If it has a data component that we want, then process that
837  */
838  if ((reqs & REQ_DATA) != 0)
839  {
840  /*
841  * hadDumper will be set if there is genuine data component for this
842  * node. Otherwise, we need to check the defn field for statements
843  * that need to be executed in data-only restores.
844  */
845  if (te->hadDumper)
846  {
847  /*
848  * If we can output the data, then restore it.
849  */
850  if (AH->PrintTocDataPtr != NULL)
851  {
852  _printTocEntry(AH, te, true);
853 
854  if (strcmp(te->desc, "BLOBS") == 0 ||
855  strcmp(te->desc, "BLOB COMMENTS") == 0)
856  {
857  pg_log_info("processing %s", te->desc);
858 
859  _selectOutputSchema(AH, "pg_catalog");
860 
861  /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
862  if (strcmp(te->desc, "BLOB COMMENTS") == 0)
864 
865  AH->PrintTocDataPtr(AH, te);
866 
868  }
869  else
870  {
871  bool use_truncate;
872 
874 
875  /* Select owner and schema as necessary */
876  _becomeOwner(AH, te);
877  _selectOutputSchema(AH, te->namespace);
878 
879  pg_log_info("processing data for table \"%s.%s\"",
880  te->namespace, te->tag);
881 
882  /*
883  * In parallel restore, if we created the table earlier in
884  * this run (so that we know it is empty) and we are not
885  * restoring a load-via-partition-root data item then we
886  * wrap the COPY in a transaction and precede it with a
887  * TRUNCATE. If wal_level is set to minimal this prevents
888  * WAL-logging the COPY. This obtains a speedup similar
889  * to that from using single_txn mode in non-parallel
890  * restores.
891  *
892  * We mustn't do this for load-via-partition-root cases
893  * because some data might get moved across partition
894  * boundaries, risking deadlock and/or loss of previously
895  * loaded data. (We assume that all partitions of a
896  * partitioned table will be treated the same way.)
897  */
898  use_truncate = is_parallel && te->created &&
900 
901  if (use_truncate)
902  {
903  /*
904  * Parallel restore is always talking directly to a
905  * server, so no need to see if we should issue BEGIN.
906  */
907  StartTransaction(&AH->public);
908 
909  /*
910  * Issue TRUNCATE with ONLY so that child tables are
911  * not wiped.
912  */
913  ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
914  fmtQualifiedId(te->namespace, te->tag));
915  }
916 
917  /*
918  * If we have a copy statement, use it.
919  */
920  if (te->copyStmt && strlen(te->copyStmt) > 0)
921  {
922  ahprintf(AH, "%s", te->copyStmt);
924  }
925  else
927 
928  AH->PrintTocDataPtr(AH, te);
929 
930  /*
931  * Terminate COPY if needed.
932  */
933  if (AH->outputKind == OUTPUT_COPYDATA &&
934  RestoringToDB(AH))
935  EndDBCopyMode(&AH->public, te->tag);
937 
938  /* close out the transaction started above */
939  if (use_truncate)
941 
943  }
944  }
945  }
946  else if (!defnDumped)
947  {
948  /* If we haven't already dumped the defn part, do so now */
949  pg_log_info("executing %s %s", te->desc, te->tag);
950  _printTocEntry(AH, te, false);
951  }
952  }
953 
954  if (AH->public.n_errors > 0 && status == WORKER_OK)
955  status = WORKER_IGNORED_ERRORS;
956 
957  return status;
958 }
959 
960 /*
961  * Allocate a new RestoreOptions block.
962  * This is mainly so we can initialize it, but also for future expansion,
963  */
966 {
968 
970 
971  /* set any fields that shouldn't default to zeroes */
972  opts->format = archUnknown;
973  opts->cparams.promptPassword = TRI_DEFAULT;
974  opts->dumpSections = DUMP_UNSECTIONED;
975  opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
976  opts->compression_spec.level = 0;
977 
978  return opts;
979 }
980 
981 static void
983 {
984  RestoreOptions *ropt = AH->public.ropt;
985 
986  /* This hack is only needed in a data-only restore */
987  if (!ropt->dataOnly || !ropt->disable_triggers)
988  return;
989 
990  pg_log_info("disabling triggers for %s", te->tag);
991 
992  /*
993  * Become superuser if possible, since they are the only ones who can
994  * disable constraint triggers. If -S was not given, assume the initial
995  * user identity is a superuser. (XXX would it be better to become the
996  * table owner?)
997  */
998  _becomeUser(AH, ropt->superuser);
999 
1000  /*
1001  * Disable them.
1002  */
1003  ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1004  fmtQualifiedId(te->namespace, te->tag));
1005 }
1006 
1007 static void
1009 {
1010  RestoreOptions *ropt = AH->public.ropt;
1011 
1012  /* This hack is only needed in a data-only restore */
1013  if (!ropt->dataOnly || !ropt->disable_triggers)
1014  return;
1015 
1016  pg_log_info("enabling triggers for %s", te->tag);
1017 
1018  /*
1019  * Become superuser if possible, since they are the only ones who can
1020  * disable constraint triggers. If -S was not given, assume the initial
1021  * user identity is a superuser. (XXX would it be better to become the
1022  * table owner?)
1023  */
1024  _becomeUser(AH, ropt->superuser);
1025 
1026  /*
1027  * Enable them.
1028  */
1029  ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1030  fmtQualifiedId(te->namespace, te->tag));
1031 }
1032 
1033 /*
1034  * Detect whether a TABLE DATA TOC item is performing "load via partition
1035  * root", that is the target table is an ancestor partition rather than the
1036  * table the TOC item is nominally for.
1037  *
1038  * In newer archive files this can be detected by checking for a special
1039  * comment placed in te->defn. In older files we have to fall back to seeing
1040  * if the COPY statement targets the named table or some other one. This
1041  * will not work for data dumped as INSERT commands, so we could give a false
1042  * negative in that case; fortunately, that's a rarely-used option.
1043  */
1044 static bool
1046 {
1047  if (te->defn &&
1048  strncmp(te->defn, "-- load via partition root ", 27) == 0)
1049  return true;
1050  if (te->copyStmt && *te->copyStmt)
1051  {
1052  PQExpBuffer copyStmt = createPQExpBuffer();
1053  bool result;
1054 
1055  /*
1056  * Build the initial part of the COPY as it would appear if the
1057  * nominal target table is the actual target. If we see anything
1058  * else, it must be a load-via-partition-root case.
1059  */
1060  appendPQExpBuffer(copyStmt, "COPY %s ",
1061  fmtQualifiedId(te->namespace, te->tag));
1062  result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1063  destroyPQExpBuffer(copyStmt);
1064  return result;
1065  }
1066  /* Assume it's not load-via-partition-root */
1067  return false;
1068 }
1069 
1070 /*
1071  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1072  */
1073 
1074 /* Public */
1075 void
1076 WriteData(Archive *AHX, const void *data, size_t dLen)
1077 {
1078  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1079 
1080  if (!AH->currToc)
1081  pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1082 
1083  AH->WriteDataPtr(AH, data, dLen);
1084 }
1085 
1086 /*
1087  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1088  * repository for all metadata. But the name has stuck.
1089  *
1090  * The new entry is added to the Archive's TOC list. Most callers can ignore
1091  * the result value because nothing else need be done, but a few want to
1092  * manipulate the TOC entry further.
1093  */
1094 
1095 /* Public */
1096 TocEntry *
1097 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1098  ArchiveOpts *opts)
1099 {
1100  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1101  TocEntry *newToc;
1102 
1103  newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1104 
1105  AH->tocCount++;
1106  if (dumpId > AH->maxDumpId)
1107  AH->maxDumpId = dumpId;
1108 
1109  newToc->prev = AH->toc->prev;
1110  newToc->next = AH->toc;
1111  AH->toc->prev->next = newToc;
1112  AH->toc->prev = newToc;
1113 
1114  newToc->catalogId = catalogId;
1115  newToc->dumpId = dumpId;
1116  newToc->section = opts->section;
1117 
1118  newToc->tag = pg_strdup(opts->tag);
1119  newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1120  newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1121  newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1122  newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1123  newToc->desc = pg_strdup(opts->description);
1124  newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1125  newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1126  newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1127 
1128  if (opts->nDeps > 0)
1129  {
1130  newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1131  memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1132  newToc->nDeps = opts->nDeps;
1133  }
1134  else
1135  {
1136  newToc->dependencies = NULL;
1137  newToc->nDeps = 0;
1138  }
1139 
1140  newToc->dataDumper = opts->dumpFn;
1141  newToc->dataDumperArg = opts->dumpArg;
1142  newToc->hadDumper = opts->dumpFn ? true : false;
1143 
1144  newToc->formatData = NULL;
1145  newToc->dataLength = 0;
1146 
1147  if (AH->ArchiveEntryPtr != NULL)
1148  AH->ArchiveEntryPtr(AH, newToc);
1149 
1150  return newToc;
1151 }
1152 
1153 /* Public */
1154 void
1156 {
1157  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1158  RestoreOptions *ropt = AH->public.ropt;
1159  TocEntry *te;
1160  pg_compress_specification out_compression_spec = {0};
1161  teSection curSection;
1162  CompressFileHandle *sav;
1163  const char *fmtName;
1164  char stamp_str[64];
1165 
1166  /* TOC is always uncompressed */
1167  out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1168 
1169  sav = SaveOutput(AH);
1170  if (ropt->filename)
1171  SetOutput(AH, ropt->filename, out_compression_spec);
1172 
1173  if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1174  localtime(&AH->createDate)) == 0)
1175  strcpy(stamp_str, "[unknown]");
1176 
1177  ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1178  ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1179  sanitize_line(AH->archdbname, false),
1180  AH->tocCount,
1182 
1183  switch (AH->format)
1184  {
1185  case archCustom:
1186  fmtName = "CUSTOM";
1187  break;
1188  case archDirectory:
1189  fmtName = "DIRECTORY";
1190  break;
1191  case archTar:
1192  fmtName = "TAR";
1193  break;
1194  default:
1195  fmtName = "UNKNOWN";
1196  }
1197 
1198  ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1200  ahprintf(AH, "; Format: %s\n", fmtName);
1201  ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1202  ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1203  if (AH->archiveRemoteVersion)
1204  ahprintf(AH, "; Dumped from database version: %s\n",
1205  AH->archiveRemoteVersion);
1206  if (AH->archiveDumpVersion)
1207  ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1208  AH->archiveDumpVersion);
1209 
1210  ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1211 
1212  curSection = SECTION_PRE_DATA;
1213  for (te = AH->toc->next; te != AH->toc; te = te->next)
1214  {
1215  if (te->section != SECTION_NONE)
1216  curSection = te->section;
1217  if (ropt->verbose ||
1218  (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1219  {
1220  char *sanitized_name;
1221  char *sanitized_schema;
1222  char *sanitized_owner;
1223 
1224  /*
1225  */
1226  sanitized_name = sanitize_line(te->tag, false);
1227  sanitized_schema = sanitize_line(te->namespace, true);
1228  sanitized_owner = sanitize_line(te->owner, false);
1229 
1230  ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1231  te->catalogId.tableoid, te->catalogId.oid,
1232  te->desc, sanitized_schema, sanitized_name,
1233  sanitized_owner);
1234 
1235  free(sanitized_name);
1236  free(sanitized_schema);
1237  free(sanitized_owner);
1238  }
1239  if (ropt->verbose && te->nDeps > 0)
1240  {
1241  int i;
1242 
1243  ahprintf(AH, ";\tdepends on:");
1244  for (i = 0; i < te->nDeps; i++)
1245  ahprintf(AH, " %d", te->dependencies[i]);
1246  ahprintf(AH, "\n");
1247  }
1248  }
1249 
1250  /* Enforce strict names checking */
1251  if (ropt->strict_names)
1252  StrictNamesCheck(ropt);
1253 
1254  if (ropt->filename)
1255  RestoreOutput(AH, sav);
1256 }
1257 
1258 /***********
1259  * Large Object Archival
1260  ***********/
1261 
1262 /* Called by a dumper to signal start of a LO */
1263 int
1264 StartLO(Archive *AHX, Oid oid)
1265 {
1266  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1267 
1268  if (!AH->StartLOPtr)
1269  pg_fatal("large-object output not supported in chosen format");
1270 
1271  AH->StartLOPtr(AH, AH->currToc, oid);
1272 
1273  return 1;
1274 }
1275 
1276 /* Called by a dumper to signal end of a LO */
1277 int
1278 EndLO(Archive *AHX, Oid oid)
1279 {
1280  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1281 
1282  if (AH->EndLOPtr)
1283  AH->EndLOPtr(AH, AH->currToc, oid);
1284 
1285  return 1;
1286 }
1287 
1288 /**********
1289  * Large Object Restoration
1290  **********/
1291 
1292 /*
1293  * Called by a format handler before any LOs are restored
1294  */
1295 void
1297 {
1298  RestoreOptions *ropt = AH->public.ropt;
1299 
1300  if (!ropt->single_txn)
1301  {
1302  if (AH->connection)
1303  StartTransaction(&AH->public);
1304  else
1305  ahprintf(AH, "BEGIN;\n\n");
1306  }
1307 
1308  AH->loCount = 0;
1309 }
1310 
1311 /*
1312  * Called by a format handler after all LOs are restored
1313  */
1314 void
1316 {
1317  RestoreOptions *ropt = AH->public.ropt;
1318 
1319  if (!ropt->single_txn)
1320  {
1321  if (AH->connection)
1322  CommitTransaction(&AH->public);
1323  else
1324  ahprintf(AH, "COMMIT;\n\n");
1325  }
1326 
1327  pg_log_info(ngettext("restored %d large object",
1328  "restored %d large objects",
1329  AH->loCount),
1330  AH->loCount);
1331 }
1332 
1333 
1334 /*
1335  * Called by a format handler to initiate restoration of a LO
1336  */
1337 void
1338 StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1339 {
1340  bool old_lo_style = (AH->version < K_VERS_1_12);
1341  Oid loOid;
1342 
1343  AH->loCount++;
1344 
1345  /* Initialize the LO Buffer */
1346  AH->lo_buf_used = 0;
1347 
1348  pg_log_info("restoring large object with OID %u", oid);
1349 
1350  /* With an old archive we must do drop and create logic here */
1351  if (old_lo_style && drop)
1352  DropLOIfExists(AH, oid);
1353 
1354  if (AH->connection)
1355  {
1356  if (old_lo_style)
1357  {
1358  loOid = lo_create(AH->connection, oid);
1359  if (loOid == 0 || loOid != oid)
1360  pg_fatal("could not create large object %u: %s",
1361  oid, PQerrorMessage(AH->connection));
1362  }
1363  AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1364  if (AH->loFd == -1)
1365  pg_fatal("could not open large object %u: %s",
1366  oid, PQerrorMessage(AH->connection));
1367  }
1368  else
1369  {
1370  if (old_lo_style)
1371  ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1372  oid, INV_WRITE);
1373  else
1374  ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1375  oid, INV_WRITE);
1376  }
1377 
1378  AH->writingLO = true;
1379 }
1380 
1381 void
1383 {
1384  if (AH->lo_buf_used > 0)
1385  {
1386  /* Write remaining bytes from the LO buffer */
1387  dump_lo_buf(AH);
1388  }
1389 
1390  AH->writingLO = false;
1391 
1392  if (AH->connection)
1393  {
1394  lo_close(AH->connection, AH->loFd);
1395  AH->loFd = -1;
1396  }
1397  else
1398  {
1399  ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1400  }
1401 }
1402 
1403 /***********
1404  * Sorting and Reordering
1405  ***********/
1406 
1407 void
1409 {
1410  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1411  RestoreOptions *ropt = AH->public.ropt;
1412  FILE *fh;
1413  StringInfoData linebuf;
1414 
1415  /* Allocate space for the 'wanted' array, and init it */
1416  ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1417 
1418  /* Setup the file */
1419  fh = fopen(ropt->tocFile, PG_BINARY_R);
1420  if (!fh)
1421  pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1422 
1423  initStringInfo(&linebuf);
1424 
1425  while (pg_get_line_buf(fh, &linebuf))
1426  {
1427  char *cmnt;
1428  char *endptr;
1429  DumpId id;
1430  TocEntry *te;
1431 
1432  /* Truncate line at comment, if any */
1433  cmnt = strchr(linebuf.data, ';');
1434  if (cmnt != NULL)
1435  {
1436  cmnt[0] = '\0';
1437  linebuf.len = cmnt - linebuf.data;
1438  }
1439 
1440  /* Ignore if all blank */
1441  if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1442  continue;
1443 
1444  /* Get an ID, check it's valid and not already seen */
1445  id = strtol(linebuf.data, &endptr, 10);
1446  if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1447  ropt->idWanted[id - 1])
1448  {
1449  pg_log_warning("line ignored: %s", linebuf.data);
1450  continue;
1451  }
1452 
1453  /* Find TOC entry */
1454  te = getTocEntryByDumpId(AH, id);
1455  if (!te)
1456  pg_fatal("could not find entry for ID %d",
1457  id);
1458 
1459  /* Mark it wanted */
1460  ropt->idWanted[id - 1] = true;
1461 
1462  /*
1463  * Move each item to the end of the list as it is selected, so that
1464  * they are placed in the desired order. Any unwanted items will end
1465  * up at the front of the list, which may seem unintuitive but it's
1466  * what we need. In an ordinary serial restore that makes no
1467  * difference, but in a parallel restore we need to mark unrestored
1468  * items' dependencies as satisfied before we start examining
1469  * restorable items. Otherwise they could have surprising
1470  * side-effects on the order in which restorable items actually get
1471  * restored.
1472  */
1473  _moveBefore(AH->toc, te);
1474  }
1475 
1476  pg_free(linebuf.data);
1477 
1478  if (fclose(fh) != 0)
1479  pg_fatal("could not close TOC file: %m");
1480 }
1481 
1482 /**********************
1483  * Convenience functions that look like standard IO functions
1484  * for writing data when in dump mode.
1485  **********************/
1486 
1487 /* Public */
1488 void
1489 archputs(const char *s, Archive *AH)
1490 {
1491  WriteData(AH, s, strlen(s));
1492 }
1493 
1494 /* Public */
1495 int
1496 archprintf(Archive *AH, const char *fmt,...)
1497 {
1498  int save_errno = errno;
1499  char *p;
1500  size_t len = 128; /* initial assumption about buffer size */
1501  size_t cnt;
1502 
1503  for (;;)
1504  {
1505  va_list args;
1506 
1507  /* Allocate work buffer. */
1508  p = (char *) pg_malloc(len);
1509 
1510  /* Try to format the data. */
1511  errno = save_errno;
1512  va_start(args, fmt);
1513  cnt = pvsnprintf(p, len, fmt, args);
1514  va_end(args);
1515 
1516  if (cnt < len)
1517  break; /* success */
1518 
1519  /* Release buffer and loop around to try again with larger len. */
1520  free(p);
1521  len = cnt;
1522  }
1523 
1524  WriteData(AH, p, cnt);
1525  free(p);
1526  return (int) cnt;
1527 }
1528 
1529 
1530 /*******************************
1531  * Stuff below here should be 'private' to the archiver routines
1532  *******************************/
1533 
1534 static void
1536  const pg_compress_specification compression_spec)
1537 {
1538  CompressFileHandle *CFH;
1539  const char *mode;
1540  int fn = -1;
1541 
1542  if (filename)
1543  {
1544  if (strcmp(filename, "-") == 0)
1545  fn = fileno(stdout);
1546  }
1547  else if (AH->FH)
1548  fn = fileno(AH->FH);
1549  else if (AH->fSpec)
1550  {
1551  filename = AH->fSpec;
1552  }
1553  else
1554  fn = fileno(stdout);
1555 
1556  if (AH->mode == archModeAppend)
1557  mode = PG_BINARY_A;
1558  else
1559  mode = PG_BINARY_W;
1560 
1561  CFH = InitCompressFileHandle(compression_spec);
1562 
1563  if (!CFH->open_func(filename, fn, mode, CFH))
1564  {
1565  if (filename)
1566  pg_fatal("could not open output file \"%s\": %m", filename);
1567  else
1568  pg_fatal("could not open output file: %m");
1569  }
1570 
1571  AH->OF = CFH;
1572 }
1573 
1574 static CompressFileHandle *
1576 {
1577  return (CompressFileHandle *) AH->OF;
1578 }
1579 
1580 static void
1582 {
1583  errno = 0;
1584  if (!EndCompressFileHandle(AH->OF))
1585  pg_fatal("could not close output file: %m");
1586 
1587  AH->OF = savedOutput;
1588 }
1589 
1590 
1591 
1592 /*
1593  * Print formatted text to the output file (usually stdout).
1594  */
1595 int
1596 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1597 {
1598  int save_errno = errno;
1599  char *p;
1600  size_t len = 128; /* initial assumption about buffer size */
1601  size_t cnt;
1602 
1603  for (;;)
1604  {
1605  va_list args;
1606 
1607  /* Allocate work buffer. */
1608  p = (char *) pg_malloc(len);
1609 
1610  /* Try to format the data. */
1611  errno = save_errno;
1612  va_start(args, fmt);
1613  cnt = pvsnprintf(p, len, fmt, args);
1614  va_end(args);
1615 
1616  if (cnt < len)
1617  break; /* success */
1618 
1619  /* Release buffer and loop around to try again with larger len. */
1620  free(p);
1621  len = cnt;
1622  }
1623 
1624  ahwrite(p, 1, cnt, AH);
1625  free(p);
1626  return (int) cnt;
1627 }
1628 
1629 /*
1630  * Single place for logic which says 'We are restoring to a direct DB connection'.
1631  */
1632 static int
1634 {
1635  RestoreOptions *ropt = AH->public.ropt;
1636 
1637  return (ropt && ropt->useDB && AH->connection);
1638 }
1639 
1640 /*
1641  * Dump the current contents of the LO data buffer while writing a LO
1642  */
1643 static void
1645 {
1646  if (AH->connection)
1647  {
1648  int res;
1649 
1650  res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1651  pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1652  "wrote %zu bytes of large object data (result = %d)",
1653  AH->lo_buf_used),
1654  AH->lo_buf_used, res);
1655  /* We assume there are no short writes, only errors */
1656  if (res != AH->lo_buf_used)
1657  warn_or_exit_horribly(AH, "could not write to large object: %s",
1658  PQerrorMessage(AH->connection));
1659  }
1660  else
1661  {
1663 
1665  (const unsigned char *) AH->lo_buf,
1666  AH->lo_buf_used,
1667  AH);
1668 
1669  /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1670  AH->writingLO = false;
1671  ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1672  AH->writingLO = true;
1673 
1675  }
1676  AH->lo_buf_used = 0;
1677 }
1678 
1679 
1680 /*
1681  * Write buffer to the output file (usually stdout). This is used for
1682  * outputting 'restore' scripts etc. It is even possible for an archive
1683  * format to create a custom output routine to 'fake' a restore if it
1684  * wants to generate a script (see TAR output).
1685  */
1686 void
1687 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1688 {
1689  int bytes_written = 0;
1690 
1691  if (AH->writingLO)
1692  {
1693  size_t remaining = size * nmemb;
1694 
1695  while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1696  {
1697  size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1698 
1699  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1700  ptr = (const void *) ((const char *) ptr + avail);
1701  remaining -= avail;
1702  AH->lo_buf_used += avail;
1703  dump_lo_buf(AH);
1704  }
1705 
1706  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1707  AH->lo_buf_used += remaining;
1708 
1709  bytes_written = size * nmemb;
1710  }
1711  else if (AH->CustomOutPtr)
1712  bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1713 
1714  /*
1715  * If we're doing a restore, and it's direct to DB, and we're connected
1716  * then send it to the DB.
1717  */
1718  else if (RestoringToDB(AH))
1719  bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1720  else
1721  {
1722  CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1723 
1724  if (CFH->write_func(ptr, size * nmemb, CFH))
1725  bytes_written = size * nmemb;
1726  }
1727 
1728  if (bytes_written != size * nmemb)
1730 }
1731 
1732 /* on some error, we may decide to go on... */
1733 void
1735 {
1736  va_list ap;
1737 
1738  switch (AH->stage)
1739  {
1740 
1741  case STAGE_NONE:
1742  /* Do nothing special */
1743  break;
1744 
1745  case STAGE_INITIALIZING:
1746  if (AH->stage != AH->lastErrorStage)
1747  pg_log_info("while INITIALIZING:");
1748  break;
1749 
1750  case STAGE_PROCESSING:
1751  if (AH->stage != AH->lastErrorStage)
1752  pg_log_info("while PROCESSING TOC:");
1753  break;
1754 
1755  case STAGE_FINALIZING:
1756  if (AH->stage != AH->lastErrorStage)
1757  pg_log_info("while FINALIZING:");
1758  break;
1759  }
1760  if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1761  {
1762  pg_log_info("from TOC entry %d; %u %u %s %s %s",
1763  AH->currentTE->dumpId,
1765  AH->currentTE->catalogId.oid,
1766  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1767  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1768  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1769  }
1770  AH->lastErrorStage = AH->stage;
1771  AH->lastErrorTE = AH->currentTE;
1772 
1773  va_start(ap, fmt);
1775  va_end(ap);
1776 
1777  if (AH->public.exit_on_error)
1778  exit_nicely(1);
1779  else
1780  AH->public.n_errors++;
1781 }
1782 
1783 #ifdef NOT_USED
1784 
1785 static void
1786 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1787 {
1788  /* Unlink te from list */
1789  te->prev->next = te->next;
1790  te->next->prev = te->prev;
1791 
1792  /* and insert it after "pos" */
1793  te->prev = pos;
1794  te->next = pos->next;
1795  pos->next->prev = te;
1796  pos->next = te;
1797 }
1798 #endif
1799 
1800 static void
1802 {
1803  /* Unlink te from list */
1804  te->prev->next = te->next;
1805  te->next->prev = te->prev;
1806 
1807  /* and insert it before "pos" */
1808  te->prev = pos->prev;
1809  te->next = pos;
1810  pos->prev->next = te;
1811  pos->prev = te;
1812 }
1813 
1814 /*
1815  * Build index arrays for the TOC list
1816  *
1817  * This should be invoked only after we have created or read in all the TOC
1818  * items.
1819  *
1820  * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1821  * array entries run only up to maxDumpId. We might see dependency dump IDs
1822  * beyond that (if the dump was partial); so always check the array bound
1823  * before trying to touch an array entry.
1824  */
1825 static void
1827 {
1828  DumpId maxDumpId = AH->maxDumpId;
1829  TocEntry *te;
1830 
1831  AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1832  AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1833 
1834  for (te = AH->toc->next; te != AH->toc; te = te->next)
1835  {
1836  /* this check is purely paranoia, maxDumpId should be correct */
1837  if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1838  pg_fatal("bad dumpId");
1839 
1840  /* tocsByDumpId indexes all TOCs by their dump ID */
1841  AH->tocsByDumpId[te->dumpId] = te;
1842 
1843  /*
1844  * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1845  * TOC entry that has a DATA item. We compute this by reversing the
1846  * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1847  * just one dependency and it is the TABLE item.
1848  */
1849  if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1850  {
1851  DumpId tableId = te->dependencies[0];
1852 
1853  /*
1854  * The TABLE item might not have been in the archive, if this was
1855  * a data-only dump; but its dump ID should be less than its data
1856  * item's dump ID, so there should be a place for it in the array.
1857  */
1858  if (tableId <= 0 || tableId > maxDumpId)
1859  pg_fatal("bad table dumpId for TABLE DATA item");
1860 
1861  AH->tableDataId[tableId] = te->dumpId;
1862  }
1863  }
1864 }
1865 
1866 TocEntry *
1868 {
1869  /* build index arrays if we didn't already */
1870  if (AH->tocsByDumpId == NULL)
1871  buildTocEntryArrays(AH);
1872 
1873  if (id > 0 && id <= AH->maxDumpId)
1874  return AH->tocsByDumpId[id];
1875 
1876  return NULL;
1877 }
1878 
1879 int
1881 {
1882  TocEntry *te = getTocEntryByDumpId(AH, id);
1883 
1884  if (!te)
1885  return 0;
1886 
1887  return te->reqs;
1888 }
1889 
1890 size_t
1891 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1892 {
1893  int off;
1894 
1895  /* Save the flag */
1896  AH->WriteBytePtr(AH, wasSet);
1897 
1898  /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1899  for (off = 0; off < sizeof(pgoff_t); off++)
1900  {
1901  AH->WriteBytePtr(AH, o & 0xFF);
1902  o >>= 8;
1903  }
1904  return sizeof(pgoff_t) + 1;
1905 }
1906 
1907 int
1909 {
1910  int i;
1911  int off;
1912  int offsetFlg;
1913 
1914  /* Initialize to zero */
1915  *o = 0;
1916 
1917  /* Check for old version */
1918  if (AH->version < K_VERS_1_7)
1919  {
1920  /* Prior versions wrote offsets using WriteInt */
1921  i = ReadInt(AH);
1922  /* -1 means not set */
1923  if (i < 0)
1924  return K_OFFSET_POS_NOT_SET;
1925  else if (i == 0)
1926  return K_OFFSET_NO_DATA;
1927 
1928  /* Cast to pgoff_t because it was written as an int. */
1929  *o = (pgoff_t) i;
1930  return K_OFFSET_POS_SET;
1931  }
1932 
1933  /*
1934  * Read the flag indicating the state of the data pointer. Check if valid
1935  * and die if not.
1936  *
1937  * This used to be handled by a negative or zero pointer, now we use an
1938  * extra byte specifically for the state.
1939  */
1940  offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1941 
1942  switch (offsetFlg)
1943  {
1944  case K_OFFSET_POS_NOT_SET:
1945  case K_OFFSET_NO_DATA:
1946  case K_OFFSET_POS_SET:
1947 
1948  break;
1949 
1950  default:
1951  pg_fatal("unexpected data offset flag %d", offsetFlg);
1952  }
1953 
1954  /*
1955  * Read the bytes
1956  */
1957  for (off = 0; off < AH->offSize; off++)
1958  {
1959  if (off < sizeof(pgoff_t))
1960  *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1961  else
1962  {
1963  if (AH->ReadBytePtr(AH) != 0)
1964  pg_fatal("file offset in dump file is too large");
1965  }
1966  }
1967 
1968  return offsetFlg;
1969 }
1970 
1971 size_t
1973 {
1974  int b;
1975 
1976  /*
1977  * This is a bit yucky, but I don't want to make the binary format very
1978  * dependent on representation, and not knowing much about it, I write out
1979  * a sign byte. If you change this, don't forget to change the file
1980  * version #, and modify ReadInt to read the new format AS WELL AS the old
1981  * formats.
1982  */
1983 
1984  /* SIGN byte */
1985  if (i < 0)
1986  {
1987  AH->WriteBytePtr(AH, 1);
1988  i = -i;
1989  }
1990  else
1991  AH->WriteBytePtr(AH, 0);
1992 
1993  for (b = 0; b < AH->intSize; b++)
1994  {
1995  AH->WriteBytePtr(AH, i & 0xFF);
1996  i >>= 8;
1997  }
1998 
1999  return AH->intSize + 1;
2000 }
2001 
2002 int
2004 {
2005  int res = 0;
2006  int bv,
2007  b;
2008  int sign = 0; /* Default positive */
2009  int bitShift = 0;
2010 
2011  if (AH->version > K_VERS_1_0)
2012  /* Read a sign byte */
2013  sign = AH->ReadBytePtr(AH);
2014 
2015  for (b = 0; b < AH->intSize; b++)
2016  {
2017  bv = AH->ReadBytePtr(AH) & 0xFF;
2018  if (bv != 0)
2019  res = res + (bv << bitShift);
2020  bitShift += 8;
2021  }
2022 
2023  if (sign)
2024  res = -res;
2025 
2026  return res;
2027 }
2028 
2029 size_t
2030 WriteStr(ArchiveHandle *AH, const char *c)
2031 {
2032  size_t res;
2033 
2034  if (c)
2035  {
2036  int len = strlen(c);
2037 
2038  res = WriteInt(AH, len);
2039  AH->WriteBufPtr(AH, c, len);
2040  res += len;
2041  }
2042  else
2043  res = WriteInt(AH, -1);
2044 
2045  return res;
2046 }
2047 
2048 char *
2050 {
2051  char *buf;
2052  int l;
2053 
2054  l = ReadInt(AH);
2055  if (l < 0)
2056  buf = NULL;
2057  else
2058  {
2059  buf = (char *) pg_malloc(l + 1);
2060  AH->ReadBufPtr(AH, (void *) buf, l);
2061 
2062  buf[l] = '\0';
2063  }
2064 
2065  return buf;
2066 }
2067 
2068 static bool
2069 _fileExistsInDirectory(const char *dir, const char *filename)
2070 {
2071  struct stat st;
2072  char buf[MAXPGPATH];
2073 
2074  if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2075  pg_fatal("directory name too long: \"%s\"", dir);
2076 
2077  return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2078 }
2079 
2080 static int
2082 {
2083  FILE *fh;
2084  char sig[6]; /* More than enough */
2085  size_t cnt;
2086  int wantClose = 0;
2087 
2088  pg_log_debug("attempting to ascertain archive format");
2089 
2090  free(AH->lookahead);
2091 
2092  AH->readHeader = 0;
2093  AH->lookaheadSize = 512;
2094  AH->lookahead = pg_malloc0(512);
2095  AH->lookaheadLen = 0;
2096  AH->lookaheadPos = 0;
2097 
2098  if (AH->fSpec)
2099  {
2100  struct stat st;
2101 
2102  wantClose = 1;
2103 
2104  /*
2105  * Check if the specified archive is a directory. If so, check if
2106  * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2107  */
2108  if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2109  {
2110  AH->format = archDirectory;
2111  if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2112  return AH->format;
2113 #ifdef HAVE_LIBZ
2114  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2115  return AH->format;
2116 #endif
2117 #ifdef USE_LZ4
2118  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2119  return AH->format;
2120 #endif
2121 #ifdef USE_ZSTD
2122  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2123  return AH->format;
2124 #endif
2125  pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2126  AH->fSpec);
2127  fh = NULL; /* keep compiler quiet */
2128  }
2129  else
2130  {
2131  fh = fopen(AH->fSpec, PG_BINARY_R);
2132  if (!fh)
2133  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2134  }
2135  }
2136  else
2137  {
2138  fh = stdin;
2139  if (!fh)
2140  pg_fatal("could not open input file: %m");
2141  }
2142 
2143  if ((cnt = fread(sig, 1, 5, fh)) != 5)
2144  {
2145  if (ferror(fh))
2146  pg_fatal("could not read input file: %m");
2147  else
2148  pg_fatal("input file is too short (read %lu, expected 5)",
2149  (unsigned long) cnt);
2150  }
2151 
2152  /* Save it, just in case we need it later */
2153  memcpy(&AH->lookahead[0], sig, 5);
2154  AH->lookaheadLen = 5;
2155 
2156  if (strncmp(sig, "PGDMP", 5) == 0)
2157  {
2158  /* It's custom format, stop here */
2159  AH->format = archCustom;
2160  AH->readHeader = 1;
2161  }
2162  else
2163  {
2164  /*
2165  * *Maybe* we have a tar archive format file or a text dump ... So,
2166  * read first 512 byte header...
2167  */
2168  cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2169  /* read failure is checked below */
2170  AH->lookaheadLen += cnt;
2171 
2172  if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2173  (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2174  strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2175  {
2176  /*
2177  * looks like it's probably a text format dump. so suggest they
2178  * try psql
2179  */
2180  pg_fatal("input file appears to be a text format dump. Please use psql.");
2181  }
2182 
2183  if (AH->lookaheadLen != 512)
2184  {
2185  if (feof(fh))
2186  pg_fatal("input file does not appear to be a valid archive (too short?)");
2187  else
2188  READ_ERROR_EXIT(fh);
2189  }
2190 
2191  if (!isValidTarHeader(AH->lookahead))
2192  pg_fatal("input file does not appear to be a valid archive");
2193 
2194  AH->format = archTar;
2195  }
2196 
2197  /* Close the file if we opened it */
2198  if (wantClose)
2199  {
2200  if (fclose(fh) != 0)
2201  pg_fatal("could not close input file: %m");
2202  /* Forget lookahead, since we'll re-read header after re-opening */
2203  AH->readHeader = 0;
2204  AH->lookaheadLen = 0;
2205  }
2206 
2207  return AH->format;
2208 }
2209 
2210 
2211 /*
2212  * Allocate an archive handle
2213  */
2214 static ArchiveHandle *
2215 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2216  const pg_compress_specification compression_spec,
2217  bool dosync, ArchiveMode mode,
2219 {
2220  ArchiveHandle *AH;
2221  CompressFileHandle *CFH;
2222  pg_compress_specification out_compress_spec = {0};
2223 
2224  pg_log_debug("allocating AH for %s, format %d",
2225  FileSpec ? FileSpec : "(stdio)", fmt);
2226 
2227  AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2228 
2229  AH->version = K_VERS_SELF;
2230 
2231  /* initialize for backwards compatible string processing */
2232  AH->public.encoding = 0; /* PG_SQL_ASCII */
2233  AH->public.std_strings = false;
2234 
2235  /* sql error handling */
2236  AH->public.exit_on_error = true;
2237  AH->public.n_errors = 0;
2238 
2239  AH->archiveDumpVersion = PG_VERSION;
2240 
2241  AH->createDate = time(NULL);
2242 
2243  AH->intSize = sizeof(int);
2244  AH->offSize = sizeof(pgoff_t);
2245  if (FileSpec)
2246  {
2247  AH->fSpec = pg_strdup(FileSpec);
2248 
2249  /*
2250  * Not used; maybe later....
2251  *
2252  * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2253  * i--) if (AH->workDir[i-1] == '/')
2254  */
2255  }
2256  else
2257  AH->fSpec = NULL;
2258 
2259  AH->currUser = NULL; /* unknown */
2260  AH->currSchema = NULL; /* ditto */
2261  AH->currTablespace = NULL; /* ditto */
2262  AH->currTableAm = NULL; /* ditto */
2263 
2264  AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2265 
2266  AH->toc->next = AH->toc;
2267  AH->toc->prev = AH->toc;
2268 
2269  AH->mode = mode;
2270  AH->compression_spec = compression_spec;
2271  AH->dosync = dosync;
2272  AH->sync_method = sync_method;
2273 
2274  memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2275 
2276  /* Open stdout with no compression for AH output handle */
2277  out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2278  CFH = InitCompressFileHandle(out_compress_spec);
2279  if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2280  pg_fatal("could not open stdout for appending: %m");
2281  AH->OF = CFH;
2282 
2283  /*
2284  * On Windows, we need to use binary mode to read/write non-text files,
2285  * which include all archive formats as well as compressed plain text.
2286  * Force stdin/stdout into binary mode if that is what we are using.
2287  */
2288 #ifdef WIN32
2289  if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2290  (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2291  {
2292  if (mode == archModeWrite)
2293  _setmode(fileno(stdout), O_BINARY);
2294  else
2295  _setmode(fileno(stdin), O_BINARY);
2296  }
2297 #endif
2298 
2299  AH->SetupWorkerPtr = setupWorkerPtr;
2300 
2301  if (fmt == archUnknown)
2302  AH->format = _discoverArchiveFormat(AH);
2303  else
2304  AH->format = fmt;
2305 
2306  switch (AH->format)
2307  {
2308  case archCustom:
2310  break;
2311 
2312  case archNull:
2313  InitArchiveFmt_Null(AH);
2314  break;
2315 
2316  case archDirectory:
2318  break;
2319 
2320  case archTar:
2321  InitArchiveFmt_Tar(AH);
2322  break;
2323 
2324  default:
2325  pg_fatal("unrecognized file format \"%d\"", fmt);
2326  }
2327 
2328  return AH;
2329 }
2330 
2331 /*
2332  * Write out all data (tables & LOs)
2333  */
2334 void
2336 {
2337  TocEntry *te;
2338 
2339  if (pstate && pstate->numWorkers > 1)
2340  {
2341  /*
2342  * In parallel mode, this code runs in the leader process. We
2343  * construct an array of candidate TEs, then sort it into decreasing
2344  * size order, then dispatch each TE to a data-transfer worker. By
2345  * dumping larger tables first, we avoid getting into a situation
2346  * where we're down to one job and it's big, losing parallelism.
2347  */
2348  TocEntry **tes;
2349  int ntes;
2350 
2351  tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2352  ntes = 0;
2353  for (te = AH->toc->next; te != AH->toc; te = te->next)
2354  {
2355  /* Consider only TEs with dataDumper functions ... */
2356  if (!te->dataDumper)
2357  continue;
2358  /* ... and ignore ones not enabled for dump */
2359  if ((te->reqs & REQ_DATA) == 0)
2360  continue;
2361 
2362  tes[ntes++] = te;
2363  }
2364 
2365  if (ntes > 1)
2366  qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2367 
2368  for (int i = 0; i < ntes; i++)
2369  DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2370  mark_dump_job_done, NULL);
2371 
2372  pg_free(tes);
2373 
2374  /* Now wait for workers to finish. */
2375  WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2376  }
2377  else
2378  {
2379  /* Non-parallel mode: just dump all candidate TEs sequentially. */
2380  for (te = AH->toc->next; te != AH->toc; te = te->next)
2381  {
2382  /* Must have same filter conditions as above */
2383  if (!te->dataDumper)
2384  continue;
2385  if ((te->reqs & REQ_DATA) == 0)
2386  continue;
2387 
2389  }
2390  }
2391 }
2392 
2393 
2394 /*
2395  * Callback function that's invoked in the leader process after a step has
2396  * been parallel dumped.
2397  *
2398  * We don't need to do anything except check for worker failure.
2399  */
2400 static void
2402  TocEntry *te,
2403  int status,
2404  void *callback_data)
2405 {
2406  pg_log_info("finished item %d %s %s",
2407  te->dumpId, te->desc, te->tag);
2408 
2409  if (status != 0)
2410  pg_fatal("worker process failed: exit code %d",
2411  status);
2412 }
2413 
2414 
2415 void
2417 {
2418  StartDataPtrType startPtr;
2419  EndDataPtrType endPtr;
2420 
2421  AH->currToc = te;
2422 
2423  if (strcmp(te->desc, "BLOBS") == 0)
2424  {
2425  startPtr = AH->StartLOsPtr;
2426  endPtr = AH->EndLOsPtr;
2427  }
2428  else
2429  {
2430  startPtr = AH->StartDataPtr;
2431  endPtr = AH->EndDataPtr;
2432  }
2433 
2434  if (startPtr != NULL)
2435  (*startPtr) (AH, te);
2436 
2437  /*
2438  * The user-provided DataDumper routine needs to call AH->WriteData
2439  */
2440  te->dataDumper((Archive *) AH, te->dataDumperArg);
2441 
2442  if (endPtr != NULL)
2443  (*endPtr) (AH, te);
2444 
2445  AH->currToc = NULL;
2446 }
2447 
2448 void
2450 {
2451  TocEntry *te;
2452  char workbuf[32];
2453  int tocCount;
2454  int i;
2455 
2456  /* count entries that will actually be dumped */
2457  tocCount = 0;
2458  for (te = AH->toc->next; te != AH->toc; te = te->next)
2459  {
2460  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2461  tocCount++;
2462  }
2463 
2464  /* printf("%d TOC Entries to save\n", tocCount); */
2465 
2466  WriteInt(AH, tocCount);
2467 
2468  for (te = AH->toc->next; te != AH->toc; te = te->next)
2469  {
2470  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2471  continue;
2472 
2473  WriteInt(AH, te->dumpId);
2474  WriteInt(AH, te->dataDumper ? 1 : 0);
2475 
2476  /* OID is recorded as a string for historical reasons */
2477  sprintf(workbuf, "%u", te->catalogId.tableoid);
2478  WriteStr(AH, workbuf);
2479  sprintf(workbuf, "%u", te->catalogId.oid);
2480  WriteStr(AH, workbuf);
2481 
2482  WriteStr(AH, te->tag);
2483  WriteStr(AH, te->desc);
2484  WriteInt(AH, te->section);
2485  WriteStr(AH, te->defn);
2486  WriteStr(AH, te->dropStmt);
2487  WriteStr(AH, te->copyStmt);
2488  WriteStr(AH, te->namespace);
2489  WriteStr(AH, te->tablespace);
2490  WriteStr(AH, te->tableam);
2491  WriteStr(AH, te->owner);
2492  WriteStr(AH, "false");
2493 
2494  /* Dump list of dependencies */
2495  for (i = 0; i < te->nDeps; i++)
2496  {
2497  sprintf(workbuf, "%d", te->dependencies[i]);
2498  WriteStr(AH, workbuf);
2499  }
2500  WriteStr(AH, NULL); /* Terminate List */
2501 
2502  if (AH->WriteExtraTocPtr)
2503  AH->WriteExtraTocPtr(AH, te);
2504  }
2505 }
2506 
2507 void
2509 {
2510  int i;
2511  char *tmp;
2512  DumpId *deps;
2513  int depIdx;
2514  int depSize;
2515  TocEntry *te;
2516  bool is_supported;
2517 
2518  AH->tocCount = ReadInt(AH);
2519  AH->maxDumpId = 0;
2520 
2521  for (i = 0; i < AH->tocCount; i++)
2522  {
2523  te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2524  te->dumpId = ReadInt(AH);
2525 
2526  if (te->dumpId > AH->maxDumpId)
2527  AH->maxDumpId = te->dumpId;
2528 
2529  /* Sanity check */
2530  if (te->dumpId <= 0)
2531  pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2532  te->dumpId);
2533 
2534  te->hadDumper = ReadInt(AH);
2535 
2536  if (AH->version >= K_VERS_1_8)
2537  {
2538  tmp = ReadStr(AH);
2539  sscanf(tmp, "%u", &te->catalogId.tableoid);
2540  free(tmp);
2541  }
2542  else
2544  tmp = ReadStr(AH);
2545  sscanf(tmp, "%u", &te->catalogId.oid);
2546  free(tmp);
2547 
2548  te->tag = ReadStr(AH);
2549  te->desc = ReadStr(AH);
2550 
2551  if (AH->version >= K_VERS_1_11)
2552  {
2553  te->section = ReadInt(AH);
2554  }
2555  else
2556  {
2557  /*
2558  * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2559  * the entries into sections. This list need not cover entry
2560  * types added later than 8.4.
2561  */
2562  if (strcmp(te->desc, "COMMENT") == 0 ||
2563  strcmp(te->desc, "ACL") == 0 ||
2564  strcmp(te->desc, "ACL LANGUAGE") == 0)
2565  te->section = SECTION_NONE;
2566  else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2567  strcmp(te->desc, "BLOBS") == 0 ||
2568  strcmp(te->desc, "BLOB COMMENTS") == 0)
2569  te->section = SECTION_DATA;
2570  else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2571  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2572  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2573  strcmp(te->desc, "INDEX") == 0 ||
2574  strcmp(te->desc, "RULE") == 0 ||
2575  strcmp(te->desc, "TRIGGER") == 0)
2576  te->section = SECTION_POST_DATA;
2577  else
2578  te->section = SECTION_PRE_DATA;
2579  }
2580 
2581  te->defn = ReadStr(AH);
2582  te->dropStmt = ReadStr(AH);
2583 
2584  if (AH->version >= K_VERS_1_3)
2585  te->copyStmt = ReadStr(AH);
2586 
2587  if (AH->version >= K_VERS_1_6)
2588  te->namespace = ReadStr(AH);
2589 
2590  if (AH->version >= K_VERS_1_10)
2591  te->tablespace = ReadStr(AH);
2592 
2593  if (AH->version >= K_VERS_1_14)
2594  te->tableam = ReadStr(AH);
2595 
2596  te->owner = ReadStr(AH);
2597  is_supported = true;
2598  if (AH->version < K_VERS_1_9)
2599  is_supported = false;
2600  else
2601  {
2602  tmp = ReadStr(AH);
2603 
2604  if (strcmp(tmp, "true") == 0)
2605  is_supported = false;
2606 
2607  free(tmp);
2608  }
2609 
2610  if (!is_supported)
2611  pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2612 
2613  /* Read TOC entry dependencies */
2614  if (AH->version >= K_VERS_1_5)
2615  {
2616  depSize = 100;
2617  deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2618  depIdx = 0;
2619  for (;;)
2620  {
2621  tmp = ReadStr(AH);
2622  if (!tmp)
2623  break; /* end of list */
2624  if (depIdx >= depSize)
2625  {
2626  depSize *= 2;
2627  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2628  }
2629  sscanf(tmp, "%d", &deps[depIdx]);
2630  free(tmp);
2631  depIdx++;
2632  }
2633 
2634  if (depIdx > 0) /* We have a non-null entry */
2635  {
2636  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2637  te->dependencies = deps;
2638  te->nDeps = depIdx;
2639  }
2640  else
2641  {
2642  free(deps);
2643  te->dependencies = NULL;
2644  te->nDeps = 0;
2645  }
2646  }
2647  else
2648  {
2649  te->dependencies = NULL;
2650  te->nDeps = 0;
2651  }
2652  te->dataLength = 0;
2653 
2654  if (AH->ReadExtraTocPtr)
2655  AH->ReadExtraTocPtr(AH, te);
2656 
2657  pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2658  i, te->dumpId, te->desc, te->tag);
2659 
2660  /* link completed entry into TOC circular list */
2661  te->prev = AH->toc->prev;
2662  AH->toc->prev->next = te;
2663  AH->toc->prev = te;
2664  te->next = AH->toc;
2665 
2666  /* special processing immediately upon read for some items */
2667  if (strcmp(te->desc, "ENCODING") == 0)
2668  processEncodingEntry(AH, te);
2669  else if (strcmp(te->desc, "STDSTRINGS") == 0)
2670  processStdStringsEntry(AH, te);
2671  else if (strcmp(te->desc, "SEARCHPATH") == 0)
2672  processSearchPathEntry(AH, te);
2673  }
2674 }
2675 
2676 static void
2678 {
2679  /* te->defn should have the form SET client_encoding = 'foo'; */
2680  char *defn = pg_strdup(te->defn);
2681  char *ptr1;
2682  char *ptr2 = NULL;
2683  int encoding;
2684 
2685  ptr1 = strchr(defn, '\'');
2686  if (ptr1)
2687  ptr2 = strchr(++ptr1, '\'');
2688  if (ptr2)
2689  {
2690  *ptr2 = '\0';
2691  encoding = pg_char_to_encoding(ptr1);
2692  if (encoding < 0)
2693  pg_fatal("unrecognized encoding \"%s\"",
2694  ptr1);
2695  AH->public.encoding = encoding;
2696  }
2697  else
2698  pg_fatal("invalid ENCODING item: %s",
2699  te->defn);
2700 
2701  free(defn);
2702 }
2703 
2704 static void
2706 {
2707  /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2708  char *ptr1;
2709 
2710  ptr1 = strchr(te->defn, '\'');
2711  if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2712  AH->public.std_strings = true;
2713  else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2714  AH->public.std_strings = false;
2715  else
2716  pg_fatal("invalid STDSTRINGS item: %s",
2717  te->defn);
2718 }
2719 
2720 static void
2722 {
2723  /*
2724  * te->defn should contain a command to set search_path. We just copy it
2725  * verbatim for use later.
2726  */
2727  AH->public.searchpath = pg_strdup(te->defn);
2728 }
2729 
2730 static void
2732 {
2733  const char *missing_name;
2734 
2735  Assert(ropt->strict_names);
2736 
2737  if (ropt->schemaNames.head != NULL)
2738  {
2739  missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2740  if (missing_name != NULL)
2741  pg_fatal("schema \"%s\" not found", missing_name);
2742  }
2743 
2744  if (ropt->tableNames.head != NULL)
2745  {
2746  missing_name = simple_string_list_not_touched(&ropt->tableNames);
2747  if (missing_name != NULL)
2748  pg_fatal("table \"%s\" not found", missing_name);
2749  }
2750 
2751  if (ropt->indexNames.head != NULL)
2752  {
2753  missing_name = simple_string_list_not_touched(&ropt->indexNames);
2754  if (missing_name != NULL)
2755  pg_fatal("index \"%s\" not found", missing_name);
2756  }
2757 
2758  if (ropt->functionNames.head != NULL)
2759  {
2760  missing_name = simple_string_list_not_touched(&ropt->functionNames);
2761  if (missing_name != NULL)
2762  pg_fatal("function \"%s\" not found", missing_name);
2763  }
2764 
2765  if (ropt->triggerNames.head != NULL)
2766  {
2767  missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2768  if (missing_name != NULL)
2769  pg_fatal("trigger \"%s\" not found", missing_name);
2770  }
2771 }
2772 
2773 /*
2774  * Determine whether we want to restore this TOC entry.
2775  *
2776  * Returns 0 if entry should be skipped, or some combination of the
2777  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2778  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2779  */
2780 static int
2782 {
2783  int res = REQ_SCHEMA | REQ_DATA;
2784  RestoreOptions *ropt = AH->public.ropt;
2785 
2786  /* These items are treated specially */
2787  if (strcmp(te->desc, "ENCODING") == 0 ||
2788  strcmp(te->desc, "STDSTRINGS") == 0 ||
2789  strcmp(te->desc, "SEARCHPATH") == 0)
2790  return REQ_SPECIAL;
2791 
2792  /*
2793  * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2794  * restored in createDB mode, and not restored otherwise, independently of
2795  * all else.
2796  */
2797  if (strcmp(te->desc, "DATABASE") == 0 ||
2798  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2799  {
2800  if (ropt->createDB)
2801  return REQ_SCHEMA;
2802  else
2803  return 0;
2804  }
2805 
2806  /*
2807  * Process exclusions that affect certain classes of TOC entries.
2808  */
2809 
2810  /* If it's an ACL, maybe ignore it */
2811  if (ropt->aclsSkip && _tocEntryIsACL(te))
2812  return 0;
2813 
2814  /* If it's a comment, maybe ignore it */
2815  if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2816  return 0;
2817 
2818  /*
2819  * If it's a publication or a table part of a publication, maybe ignore
2820  * it.
2821  */
2822  if (ropt->no_publications &&
2823  (strcmp(te->desc, "PUBLICATION") == 0 ||
2824  strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2825  strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2826  return 0;
2827 
2828  /* If it's a security label, maybe ignore it */
2829  if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2830  return 0;
2831 
2832  /* If it's a subscription, maybe ignore it */
2833  if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2834  return 0;
2835 
2836  /* Ignore it if section is not to be dumped/restored */
2837  switch (curSection)
2838  {
2839  case SECTION_PRE_DATA:
2840  if (!(ropt->dumpSections & DUMP_PRE_DATA))
2841  return 0;
2842  break;
2843  case SECTION_DATA:
2844  if (!(ropt->dumpSections & DUMP_DATA))
2845  return 0;
2846  break;
2847  case SECTION_POST_DATA:
2848  if (!(ropt->dumpSections & DUMP_POST_DATA))
2849  return 0;
2850  break;
2851  default:
2852  /* shouldn't get here, really, but ignore it */
2853  return 0;
2854  }
2855 
2856  /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2857  if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2858  return 0;
2859 
2860  /*
2861  * Check options for selective dump/restore.
2862  */
2863  if (strcmp(te->desc, "ACL") == 0 ||
2864  strcmp(te->desc, "COMMENT") == 0 ||
2865  strcmp(te->desc, "SECURITY LABEL") == 0)
2866  {
2867  /* Database properties react to createDB, not selectivity options. */
2868  if (strncmp(te->tag, "DATABASE ", 9) == 0)
2869  {
2870  if (!ropt->createDB)
2871  return 0;
2872  }
2873  else if (ropt->schemaNames.head != NULL ||
2874  ropt->schemaExcludeNames.head != NULL ||
2875  ropt->selTypes)
2876  {
2877  /*
2878  * In a selective dump/restore, we want to restore these dependent
2879  * TOC entry types only if their parent object is being restored.
2880  * Without selectivity options, we let through everything in the
2881  * archive. Note there may be such entries with no parent, eg
2882  * non-default ACLs for built-in objects. Also, we make
2883  * per-column ACLs additionally depend on the table's ACL if any
2884  * to ensure correct restore order, so those dependencies should
2885  * be ignored in this check.
2886  *
2887  * This code depends on the parent having been marked already,
2888  * which should be the case; if it isn't, perhaps due to
2889  * SortTocFromFile rearrangement, skipping the dependent entry
2890  * seems prudent anyway.
2891  *
2892  * Ideally we'd handle, eg, table CHECK constraints this way too.
2893  * But it's hard to tell which of their dependencies is the one to
2894  * consult.
2895  */
2896  bool dumpthis = false;
2897 
2898  for (int i = 0; i < te->nDeps; i++)
2899  {
2900  TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
2901 
2902  if (!pte)
2903  continue; /* probably shouldn't happen */
2904  if (strcmp(pte->desc, "ACL") == 0)
2905  continue; /* ignore dependency on another ACL */
2906  if (pte->reqs == 0)
2907  continue; /* this object isn't marked, so ignore it */
2908  /* Found a parent to be dumped, so we want to dump this too */
2909  dumpthis = true;
2910  break;
2911  }
2912  if (!dumpthis)
2913  return 0;
2914  }
2915  }
2916  else
2917  {
2918  /* Apply selective-restore rules for standalone TOC entries. */
2919  if (ropt->schemaNames.head != NULL)
2920  {
2921  /* If no namespace is specified, it means all. */
2922  if (!te->namespace)
2923  return 0;
2924  if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2925  return 0;
2926  }
2927 
2928  if (ropt->schemaExcludeNames.head != NULL &&
2929  te->namespace &&
2930  simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2931  return 0;
2932 
2933  if (ropt->selTypes)
2934  {
2935  if (strcmp(te->desc, "TABLE") == 0 ||
2936  strcmp(te->desc, "TABLE DATA") == 0 ||
2937  strcmp(te->desc, "VIEW") == 0 ||
2938  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2939  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2940  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2941  strcmp(te->desc, "SEQUENCE") == 0 ||
2942  strcmp(te->desc, "SEQUENCE SET") == 0)
2943  {
2944  if (!ropt->selTable)
2945  return 0;
2946  if (ropt->tableNames.head != NULL &&
2948  return 0;
2949  }
2950  else if (strcmp(te->desc, "INDEX") == 0)
2951  {
2952  if (!ropt->selIndex)
2953  return 0;
2954  if (ropt->indexNames.head != NULL &&
2956  return 0;
2957  }
2958  else if (strcmp(te->desc, "FUNCTION") == 0 ||
2959  strcmp(te->desc, "AGGREGATE") == 0 ||
2960  strcmp(te->desc, "PROCEDURE") == 0)
2961  {
2962  if (!ropt->selFunction)
2963  return 0;
2964  if (ropt->functionNames.head != NULL &&
2966  return 0;
2967  }
2968  else if (strcmp(te->desc, "TRIGGER") == 0)
2969  {
2970  if (!ropt->selTrigger)
2971  return 0;
2972  if (ropt->triggerNames.head != NULL &&
2974  return 0;
2975  }
2976  else
2977  return 0;
2978  }
2979  }
2980 
2981  /*
2982  * Determine whether the TOC entry contains schema and/or data components,
2983  * and mask off inapplicable REQ bits. If it had a dataDumper, assume
2984  * it's both schema and data. Otherwise it's probably schema-only, but
2985  * there are exceptions.
2986  */
2987  if (!te->hadDumper)
2988  {
2989  /*
2990  * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
2991  * is considered a data entry. We don't need to check for the BLOBS
2992  * entry or old-style BLOB COMMENTS, because they will have hadDumper
2993  * = true ... but we do need to check new-style BLOB ACLs, comments,
2994  * etc.
2995  */
2996  if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2997  strcmp(te->desc, "BLOB") == 0 ||
2998  (strcmp(te->desc, "ACL") == 0 &&
2999  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3000  (strcmp(te->desc, "COMMENT") == 0 &&
3001  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3002  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3003  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
3004  res = res & REQ_DATA;
3005  else
3006  res = res & ~REQ_DATA;
3007  }
3008 
3009  /*
3010  * If there's no definition command, there's no schema component. Treat
3011  * "load via partition root" comments as not schema.
3012  */
3013  if (!te->defn || !te->defn[0] ||
3014  strncmp(te->defn, "-- load via partition root ", 27) == 0)
3015  res = res & ~REQ_SCHEMA;
3016 
3017  /*
3018  * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3019  * always ignore it.
3020  */
3021  if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3022  return 0;
3023 
3024  /* Mask it if we only want schema */
3025  if (ropt->schemaOnly)
3026  {
3027  /*
3028  * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3029  *
3030  * In binary-upgrade mode, even with schemaOnly set, we do not mask
3031  * out large objects. (Only large object definitions, comments and
3032  * other metadata should be generated in binary-upgrade mode, not the
3033  * actual data, but that need not concern us here.)
3034  */
3035  if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3036  !(ropt->binary_upgrade &&
3037  (strcmp(te->desc, "BLOB") == 0 ||
3038  (strcmp(te->desc, "ACL") == 0 &&
3039  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3040  (strcmp(te->desc, "COMMENT") == 0 &&
3041  strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3042  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3043  strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
3044  res = res & REQ_SCHEMA;
3045  }
3046 
3047  /* Mask it if we only want data */
3048  if (ropt->dataOnly)
3049  res = res & REQ_DATA;
3050 
3051  return res;
3052 }
3053 
3054 /*
3055  * Identify which pass we should restore this TOC entry in.
3056  *
3057  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3058  */
3059 static RestorePass
3061 {
3062  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3063  if (strcmp(te->desc, "ACL") == 0 ||
3064  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3065  strcmp(te->desc, "DEFAULT ACL") == 0)
3066  return RESTORE_PASS_ACL;
3067  if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3068  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3069  return RESTORE_PASS_POST_ACL;
3070 
3071  /*
3072  * Comments need to be emitted in the same pass as their parent objects.
3073  * ACLs haven't got comments, and neither do matview data objects, but
3074  * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3075  * we'd need yet another weird special case.)
3076  */
3077  if (strcmp(te->desc, "COMMENT") == 0 &&
3078  strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3079  return RESTORE_PASS_POST_ACL;
3080 
3081  /* All else can be handled in the main pass. */
3082  return RESTORE_PASS_MAIN;
3083 }
3084 
3085 /*
3086  * Identify TOC entries that are ACLs.
3087  *
3088  * Note: it seems worth duplicating some code here to avoid a hard-wired
3089  * assumption that these are exactly the same entries that we restore during
3090  * the RESTORE_PASS_ACL phase.
3091  */
3092 static bool
3094 {
3095  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3096  if (strcmp(te->desc, "ACL") == 0 ||
3097  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3098  strcmp(te->desc, "DEFAULT ACL") == 0)
3099  return true;
3100  return false;
3101 }
3102 
3103 /*
3104  * Issue SET commands for parameters that we want to have set the same way
3105  * at all times during execution of a restore script.
3106  */
3107 static void
3109 {
3110  RestoreOptions *ropt = AH->public.ropt;
3111 
3112  /*
3113  * Disable timeouts to allow for slow commands, idle parallel workers, etc
3114  */
3115  ahprintf(AH, "SET statement_timeout = 0;\n");
3116  ahprintf(AH, "SET lock_timeout = 0;\n");
3117  ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3118  ahprintf(AH, "SET transaction_timeout = 0;\n");
3119 
3120  /* Select the correct character set encoding */
3121  ahprintf(AH, "SET client_encoding = '%s';\n",
3123 
3124  /* Select the correct string literal syntax */
3125  ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3126  AH->public.std_strings ? "on" : "off");
3127 
3128  /* Select the role to be used during restore */
3129  if (ropt && ropt->use_role)
3130  ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3131 
3132  /* Select the dump-time search_path */
3133  if (AH->public.searchpath)
3134  ahprintf(AH, "%s", AH->public.searchpath);
3135 
3136  /* Make sure function checking is disabled */
3137  ahprintf(AH, "SET check_function_bodies = false;\n");
3138 
3139  /* Ensure that all valid XML data will be accepted */
3140  ahprintf(AH, "SET xmloption = content;\n");
3141 
3142  /* Avoid annoying notices etc */
3143  ahprintf(AH, "SET client_min_messages = warning;\n");
3144  if (!AH->public.std_strings)
3145  ahprintf(AH, "SET escape_string_warning = off;\n");
3146 
3147  /* Adjust row-security state */
3148  if (ropt && ropt->enable_row_security)
3149  ahprintf(AH, "SET row_security = on;\n");
3150  else
3151  ahprintf(AH, "SET row_security = off;\n");
3152 
3153  ahprintf(AH, "\n");
3154 }
3155 
3156 /*
3157  * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3158  * for updating state if appropriate. If user is NULL or an empty string,
3159  * the specification DEFAULT will be used.
3160  */
3161 static void
3163 {
3165 
3166  appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3167 
3168  /*
3169  * SQL requires a string literal here. Might as well be correct.
3170  */
3171  if (user && *user)
3172  appendStringLiteralAHX(cmd, user, AH);
3173  else
3174  appendPQExpBufferStr(cmd, "DEFAULT");
3175  appendPQExpBufferChar(cmd, ';');
3176 
3177  if (RestoringToDB(AH))
3178  {
3179  PGresult *res;
3180 
3181  res = PQexec(AH->connection, cmd->data);
3182 
3183  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3184  /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3185  pg_fatal("could not set session user to \"%s\": %s",
3187 
3188  PQclear(res);
3189  }
3190  else
3191  ahprintf(AH, "%s\n\n", cmd->data);
3192 
3193  destroyPQExpBuffer(cmd);
3194 }
3195 
3196 
3197 /*
3198  * Issue the commands to connect to the specified database.
3199  *
3200  * If we're currently restoring right into a database, this will
3201  * actually establish a connection. Otherwise it puts a \connect into
3202  * the script output.
3203  */
3204 static void
3206 {
3207  if (RestoringToDB(AH))
3209  else
3210  {
3211  PQExpBufferData connectbuf;
3212 
3213  initPQExpBuffer(&connectbuf);
3214  appendPsqlMetaConnect(&connectbuf, dbname);
3215  ahprintf(AH, "%s\n", connectbuf.data);
3216  termPQExpBuffer(&connectbuf);
3217  }
3218 
3219  /*
3220  * NOTE: currUser keeps track of what the imaginary session user in our
3221  * script is. It's now effectively reset to the original userID.
3222  */
3223  free(AH->currUser);
3224  AH->currUser = NULL;
3225 
3226  /* don't assume we still know the output schema, tablespace, etc either */
3227  free(AH->currSchema);
3228  AH->currSchema = NULL;
3229 
3230  free(AH->currTableAm);
3231  AH->currTableAm = NULL;
3232 
3233  free(AH->currTablespace);
3234  AH->currTablespace = NULL;
3235 
3236  /* re-establish fixed state */
3238 }
3239 
3240 /*
3241  * Become the specified user, and update state to avoid redundant commands
3242  *
3243  * NULL or empty argument is taken to mean restoring the session default
3244  */
3245 static void
3246 _becomeUser(ArchiveHandle *AH, const char *user)
3247 {
3248  if (!user)
3249  user = ""; /* avoid null pointers */
3250 
3251  if (AH->currUser && strcmp(AH->currUser, user) == 0)
3252  return; /* no need to do anything */
3253 
3254  _doSetSessionAuth(AH, user);
3255 
3256  /*
3257  * NOTE: currUser keeps track of what the imaginary session user in our
3258  * script is
3259  */
3260  free(AH->currUser);
3261  AH->currUser = pg_strdup(user);
3262 }
3263 
3264 /*
3265  * Become the owner of the given TOC entry object. If
3266  * changes in ownership are not allowed, this doesn't do anything.
3267  */
3268 static void
3270 {
3271  RestoreOptions *ropt = AH->public.ropt;
3272 
3273  if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3274  return;
3275 
3276  _becomeUser(AH, te->owner);
3277 }
3278 
3279 
3280 /*
3281  * Issue the commands to select the specified schema as the current schema
3282  * in the target database.
3283  */
3284 static void
3285 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3286 {
3287  PQExpBuffer qry;
3288 
3289  /*
3290  * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3291  * that search_path rather than switching to entry-specific paths.
3292  * Otherwise, it's an old archive that will not restore correctly unless
3293  * we set the search_path as it's expecting.
3294  */
3295  if (AH->public.searchpath)
3296  return;
3297 
3298  if (!schemaName || *schemaName == '\0' ||
3299  (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3300  return; /* no need to do anything */
3301 
3302  qry = createPQExpBuffer();
3303 
3304  appendPQExpBuffer(qry, "SET search_path = %s",
3305  fmtId(schemaName));
3306  if (strcmp(schemaName, "pg_catalog") != 0)
3307  appendPQExpBufferStr(qry, ", pg_catalog");
3308 
3309  if (RestoringToDB(AH))
3310  {
3311  PGresult *res;
3312 
3313  res = PQexec(AH->connection, qry->data);
3314 
3315  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3317  "could not set search_path to \"%s\": %s",
3318  schemaName, PQerrorMessage(AH->connection));
3319 
3320  PQclear(res);
3321  }
3322  else
3323  ahprintf(AH, "%s;\n\n", qry->data);
3324 
3325  free(AH->currSchema);
3326  AH->currSchema = pg_strdup(schemaName);
3327 
3328  destroyPQExpBuffer(qry);
3329 }
3330 
3331 /*
3332  * Issue the commands to select the specified tablespace as the current one
3333  * in the target database.
3334  */
3335 static void
3337 {
3338  RestoreOptions *ropt = AH->public.ropt;
3339  PQExpBuffer qry;
3340  const char *want,
3341  *have;
3342 
3343  /* do nothing in --no-tablespaces mode */
3344  if (ropt->noTablespace)
3345  return;
3346 
3347  have = AH->currTablespace;
3348  want = tablespace;
3349 
3350  /* no need to do anything for non-tablespace object */
3351  if (!want)
3352  return;
3353 
3354  if (have && strcmp(want, have) == 0)
3355  return; /* no need to do anything */
3356 
3357  qry = createPQExpBuffer();
3358 
3359  if (strcmp(want, "") == 0)
3360  {
3361  /* We want the tablespace to be the database's default */
3362  appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3363  }
3364  else
3365  {
3366  /* We want an explicit tablespace */
3367  appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3368  }
3369 
3370  if (RestoringToDB(AH))
3371  {
3372  PGresult *res;
3373 
3374  res = PQexec(AH->connection, qry->data);
3375 
3376  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3378  "could not set default_tablespace to %s: %s",
3379  fmtId(want), PQerrorMessage(AH->connection));
3380 
3381  PQclear(res);
3382  }
3383  else
3384  ahprintf(AH, "%s;\n\n", qry->data);
3385 
3386  free(AH->currTablespace);
3387  AH->currTablespace = pg_strdup(want);
3388 
3389  destroyPQExpBuffer(qry);
3390 }
3391 
3392 /*
3393  * Set the proper default_table_access_method value for the table.
3394  */
3395 static void
3396 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3397 {
3398  RestoreOptions *ropt = AH->public.ropt;
3399  PQExpBuffer cmd;
3400  const char *want,
3401  *have;
3402 
3403  /* do nothing in --no-table-access-method mode */
3404  if (ropt->noTableAm)
3405  return;
3406 
3407  have = AH->currTableAm;
3408  want = tableam;
3409 
3410  if (!want)
3411  return;
3412 
3413  if (have && strcmp(want, have) == 0)
3414  return;
3415 
3416  cmd = createPQExpBuffer();
3417  appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3418 
3419  if (RestoringToDB(AH))
3420  {
3421  PGresult *res;
3422 
3423  res = PQexec(AH->connection, cmd->data);
3424 
3425  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3427  "could not set default_table_access_method: %s",
3428  PQerrorMessage(AH->connection));
3429 
3430  PQclear(res);
3431  }
3432  else
3433  ahprintf(AH, "%s\n\n", cmd->data);
3434 
3435  destroyPQExpBuffer(cmd);
3436 
3437  free(AH->currTableAm);
3438  AH->currTableAm = pg_strdup(want);
3439 }
3440 
3441 /*
3442  * Extract an object description for a TOC entry, and append it to buf.
3443  *
3444  * This is used for ALTER ... OWNER TO.
3445  *
3446  * If the object type has no owner, do nothing.
3447  */
3448 static void
3450 {
3451  const char *type = te->desc;
3452 
3453  /* objects that don't require special decoration */
3454  if (strcmp(type, "COLLATION") == 0 ||
3455  strcmp(type, "CONVERSION") == 0 ||
3456  strcmp(type, "DOMAIN") == 0 ||
3457  strcmp(type, "FOREIGN TABLE") == 0 ||
3458  strcmp(type, "MATERIALIZED VIEW") == 0 ||
3459  strcmp(type, "SEQUENCE") == 0 ||
3460  strcmp(type, "STATISTICS") == 0 ||
3461  strcmp(type, "TABLE") == 0 ||
3462  strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3463  strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3464  strcmp(type, "TYPE") == 0 ||
3465  strcmp(type, "VIEW") == 0 ||
3466  /* non-schema-specified objects */
3467  strcmp(type, "DATABASE") == 0 ||
3468  strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3469  strcmp(type, "SCHEMA") == 0 ||
3470  strcmp(type, "EVENT TRIGGER") == 0 ||
3471  strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3472  strcmp(type, "SERVER") == 0 ||
3473  strcmp(type, "PUBLICATION") == 0 ||
3474  strcmp(type, "SUBSCRIPTION") == 0)
3475  {
3476  appendPQExpBuffer(buf, "%s ", type);
3477  if (te->namespace && *te->namespace)
3478  appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3480  }
3481  /* LOs just have a name, but it's numeric so must not use fmtId */
3482  else if (strcmp(type, "BLOB") == 0)
3483  {
3484  appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3485  }
3486 
3487  /*
3488  * These object types require additional decoration. Fortunately, the
3489  * information needed is exactly what's in the DROP command.
3490  */
3491  else if (strcmp(type, "AGGREGATE") == 0 ||
3492  strcmp(type, "FUNCTION") == 0 ||
3493  strcmp(type, "OPERATOR") == 0 ||
3494  strcmp(type, "OPERATOR CLASS") == 0 ||
3495  strcmp(type, "OPERATOR FAMILY") == 0 ||
3496  strcmp(type, "PROCEDURE") == 0)
3497  {
3498  /* Chop "DROP " off the front and make a modifiable copy */
3499  char *first = pg_strdup(te->dropStmt + 5);
3500  char *last;
3501 
3502  /* point to last character in string */
3503  last = first + strlen(first) - 1;
3504 
3505  /* Strip off any ';' or '\n' at the end */
3506  while (last >= first && (*last == '\n' || *last == ';'))
3507  last--;
3508  *(last + 1) = '\0';
3509 
3510  appendPQExpBufferStr(buf, first);
3511 
3512  free(first);
3513  return;
3514  }
3515  /* these object types don't have separate owners */
3516  else if (strcmp(type, "CAST") == 0 ||
3517  strcmp(type, "CHECK CONSTRAINT") == 0 ||
3518  strcmp(type, "CONSTRAINT") == 0 ||
3519  strcmp(type, "DATABASE PROPERTIES") == 0 ||
3520  strcmp(type, "DEFAULT") == 0 ||
3521  strcmp(type, "FK CONSTRAINT") == 0 ||
3522  strcmp(type, "INDEX") == 0 ||
3523  strcmp(type, "RULE") == 0 ||
3524  strcmp(type, "TRIGGER") == 0 ||
3525  strcmp(type, "ROW SECURITY") == 0 ||
3526  strcmp(type, "POLICY") == 0 ||
3527  strcmp(type, "USER MAPPING") == 0)
3528  {
3529  /* do nothing */
3530  }
3531  else
3532  pg_fatal("don't know how to set owner for object type \"%s\"", type);
3533 }
3534 
3535 /*
3536  * Emit the SQL commands to create the object represented by a TOC entry
3537  *
3538  * This now also includes issuing an ALTER OWNER command to restore the
3539  * object's ownership, if wanted. But note that the object's permissions
3540  * will remain at default, until the matching ACL TOC entry is restored.
3541  */
3542 static void
3543 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3544 {
3545  RestoreOptions *ropt = AH->public.ropt;
3546 
3547  /* Select owner, schema, tablespace and default AM as necessary */
3548  _becomeOwner(AH, te);
3549  _selectOutputSchema(AH, te->namespace);
3550  _selectTablespace(AH, te->tablespace);
3552 
3553  /* Emit header comment for item */
3554  if (!AH->noTocComments)
3555  {
3556  const char *pfx;
3557  char *sanitized_name;
3558  char *sanitized_schema;
3559  char *sanitized_owner;
3560 
3561  if (isData)
3562  pfx = "Data for ";
3563  else
3564  pfx = "";
3565 
3566  ahprintf(AH, "--\n");
3567  if (AH->public.verbose)
3568  {
3569  ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3570  te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3571  if (te->nDeps > 0)
3572  {
3573  int i;
3574 
3575  ahprintf(AH, "-- Dependencies:");
3576  for (i = 0; i < te->nDeps; i++)
3577  ahprintf(AH, " %d", te->dependencies[i]);
3578  ahprintf(AH, "\n");
3579  }
3580  }
3581 
3582  sanitized_name = sanitize_line(te->tag, false);
3583  sanitized_schema = sanitize_line(te->namespace, true);
3584  sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3585 
3586  ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3587  pfx, sanitized_name, te->desc, sanitized_schema,
3588  sanitized_owner);
3589 
3590  free(sanitized_name);
3591  free(sanitized_schema);
3592  free(sanitized_owner);
3593 
3594  if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3595  {
3596  char *sanitized_tablespace;
3597 
3598  sanitized_tablespace = sanitize_line(te->tablespace, false);
3599  ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3600  free(sanitized_tablespace);
3601  }
3602  ahprintf(AH, "\n");
3603 
3604  if (AH->PrintExtraTocPtr != NULL)
3605  AH->PrintExtraTocPtr(AH, te);
3606  ahprintf(AH, "--\n\n");
3607  }
3608 
3609  /*
3610  * Actually print the definition.
3611  *
3612  * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3613  * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3614  * "public" that is a comment. We have to do this when --no-owner mode is
3615  * selected. This is ugly, but I see no other good way ...
3616  */
3617  if (ropt->noOwner &&
3618  strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3619  {
3620  ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3621  }
3622  else
3623  {
3624  if (te->defn && strlen(te->defn) > 0)
3625  ahprintf(AH, "%s\n\n", te->defn);
3626  }
3627 
3628  /*
3629  * If we aren't using SET SESSION AUTH to determine ownership, we must
3630  * instead issue an ALTER OWNER command. Schema "public" is special; when
3631  * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3632  * when using SET SESSION for all other objects. We assume that anything
3633  * without a DROP command is not a separately ownable object.
3634  */
3635  if (!ropt->noOwner &&
3636  (!ropt->use_setsessauth ||
3637  (strcmp(te->desc, "SCHEMA") == 0 &&
3638  strncmp(te->defn, "--", 2) == 0)) &&
3639  te->owner && strlen(te->owner) > 0 &&
3640  te->dropStmt && strlen(te->dropStmt) > 0)
3641  {
3642  PQExpBufferData temp;
3643 
3644  initPQExpBuffer(&temp);
3645  _getObjectDescription(&temp, te);
3646 
3647  /*
3648  * If _getObjectDescription() didn't fill the buffer, then there is no
3649  * owner.
3650  */
3651  if (temp.data[0])
3652  ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp.data, fmtId(te->owner));
3653  termPQExpBuffer(&temp);
3654  }
3655 
3656  /*
3657  * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3658  * commands, so we can no longer assume we know the current auth setting.
3659  */
3660  if (_tocEntryIsACL(te))
3661  {
3662  free(AH->currUser);
3663  AH->currUser = NULL;
3664  }
3665 }
3666 
3667 /*
3668  * Sanitize a string to be included in an SQL comment or TOC listing, by
3669  * replacing any newlines with spaces. This ensures each logical output line
3670  * is in fact one physical output line, to prevent corruption of the dump
3671  * (which could, in the worst case, present an SQL injection vulnerability
3672  * if someone were to incautiously load a dump containing objects with
3673  * maliciously crafted names).
3674  *
3675  * The result is a freshly malloc'd string. If the input string is NULL,
3676  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3677  * malloc'ed hyphen.
3678  *
3679  * Note that we currently don't bother to quote names, meaning that the name
3680  * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3681  * it only examines the dumpId field, but someday we might want to try harder.
3682  */
3683 static char *
3684 sanitize_line(const char *str, bool want_hyphen)
3685 {
3686  char *result;
3687  char *s;
3688 
3689  if (!str)
3690  return pg_strdup(want_hyphen ? "-" : "");
3691 
3692  result = pg_strdup(str);
3693 
3694  for (s = result; *s != '\0'; s++)
3695  {
3696  if (*s == '\n' || *s == '\r')
3697  *s = ' ';
3698  }
3699 
3700  return result;
3701 }
3702 
3703 /*
3704  * Write the file header for a custom-format archive
3705  */
3706 void
3708 {
3709  struct tm crtm;
3710 
3711  AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3712  AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3713  AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3714  AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3715  AH->WriteBytePtr(AH, AH->intSize);
3716  AH->WriteBytePtr(AH, AH->offSize);
3717  AH->WriteBytePtr(AH, AH->format);
3719  crtm = *localtime(&AH->createDate);
3720  WriteInt(AH, crtm.tm_sec);
3721  WriteInt(AH, crtm.tm_min);
3722  WriteInt(AH, crtm.tm_hour);
3723  WriteInt(AH, crtm.tm_mday);
3724  WriteInt(AH, crtm.tm_mon);
3725  WriteInt(AH, crtm.tm_year);
3726  WriteInt(AH, crtm.tm_isdst);
3727  WriteStr(AH, PQdb(AH->connection));
3728  WriteStr(AH, AH->public.remoteVersionStr);
3729  WriteStr(AH, PG_VERSION);
3730 }
3731 
3732 void
3734 {
3735  char *errmsg;
3736  char vmaj,
3737  vmin,
3738  vrev;
3739  int fmt;
3740 
3741  /*
3742  * If we haven't already read the header, do so.
3743  *
3744  * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3745  * way to unify the cases?
3746  */
3747  if (!AH->readHeader)
3748  {
3749  char tmpMag[7];
3750 
3751  AH->ReadBufPtr(AH, tmpMag, 5);
3752 
3753  if (strncmp(tmpMag, "PGDMP", 5) != 0)
3754  pg_fatal("did not find magic string in file header");
3755  }
3756 
3757  vmaj = AH->ReadBytePtr(AH);
3758  vmin = AH->ReadBytePtr(AH);
3759 
3760  if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3761  vrev = AH->ReadBytePtr(AH);
3762  else
3763  vrev = 0;
3764 
3765  AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3766 
3767  if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3768  pg_fatal("unsupported version (%d.%d) in file header",
3769  vmaj, vmin);
3770 
3771  AH->intSize = AH->ReadBytePtr(AH);
3772  if (AH->intSize > 32)
3773  pg_fatal("sanity check on integer size (%lu) failed",
3774  (unsigned long) AH->intSize);
3775 
3776  if (AH->intSize > sizeof(int))
3777  pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3778 
3779  if (AH->version >= K_VERS_1_7)
3780  AH->offSize = AH->ReadBytePtr(AH);
3781  else
3782  AH->offSize = AH->intSize;
3783 
3784  fmt = AH->ReadBytePtr(AH);
3785 
3786  if (AH->format != fmt)
3787  pg_fatal("expected format (%d) differs from format found in file (%d)",
3788  AH->format, fmt);
3789 
3790  if (AH->version >= K_VERS_1_15)
3791  AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
3792  else if (AH->version >= K_VERS_1_2)
3793  {
3794  /* Guess the compression method based on the level */
3795  if (AH->version < K_VERS_1_4)
3796  AH->compression_spec.level = AH->ReadBytePtr(AH);
3797  else
3798  AH->compression_spec.level = ReadInt(AH);
3799 
3800  if (AH->compression_spec.level != 0)
3802  }
3803  else
3805 
3807  if (errmsg)
3808  {
3809  pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
3810  errmsg);
3811  pg_free(errmsg);
3812  }
3813 
3814  if (AH->version >= K_VERS_1_4)
3815  {
3816  struct tm crtm;
3817 
3818  crtm.tm_sec = ReadInt(AH);
3819  crtm.tm_min = ReadInt(AH);
3820  crtm.tm_hour = ReadInt(AH);
3821  crtm.tm_mday = ReadInt(AH);
3822  crtm.tm_mon = ReadInt(AH);
3823  crtm.tm_year = ReadInt(AH);
3824  crtm.tm_isdst = ReadInt(AH);
3825 
3826  /*
3827  * Newer versions of glibc have mktime() report failure if tm_isdst is
3828  * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3829  * TZ=UTC. This is problematic when restoring an archive under a
3830  * different timezone setting. If we get a failure, try again with
3831  * tm_isdst set to -1 ("don't know").
3832  *
3833  * XXX with or without this hack, we reconstruct createDate
3834  * incorrectly when the prevailing timezone is different from
3835  * pg_dump's. Next time we bump the archive version, we should flush
3836  * this representation and store a plain seconds-since-the-Epoch
3837  * timestamp instead.
3838  */
3839  AH->createDate = mktime(&crtm);
3840  if (AH->createDate == (time_t) -1)
3841  {
3842  crtm.tm_isdst = -1;
3843  AH->createDate = mktime(&crtm);
3844  if (AH->createDate == (time_t) -1)
3845  pg_log_warning("invalid creation date in header");
3846  }
3847  }
3848 
3849  if (AH->version >= K_VERS_1_4)
3850  {
3851  AH->archdbname = ReadStr(AH);
3852  }
3853 
3854  if (AH->version >= K_VERS_1_10)
3855  {
3856  AH->archiveRemoteVersion = ReadStr(AH);
3857  AH->archiveDumpVersion = ReadStr(AH);
3858  }
3859 }
3860 
3861 
3862 /*
3863  * checkSeek
3864  * check to see if ftell/fseek can be performed.
3865  */
3866 bool
3867 checkSeek(FILE *fp)
3868 {
3869  pgoff_t tpos;
3870 
3871  /* Check that ftello works on this file */
3872  tpos = ftello(fp);
3873  if (tpos < 0)
3874  return false;
3875 
3876  /*
3877  * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3878  * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3879  * successful no-op even on files that are otherwise unseekable.
3880  */
3881  if (fseeko(fp, tpos, SEEK_SET) != 0)
3882  return false;
3883 
3884  return true;
3885 }
3886 
3887 
3888 /*
3889  * dumpTimestamp
3890  */
3891 static void
3892 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3893 {
3894  char buf[64];
3895 
3896  if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3897  ahprintf(AH, "-- %s %s\n\n", msg, buf);
3898 }
3899 
3900 /*
3901  * Main engine for parallel restore.
3902  *
3903  * Parallel restore is done in three phases. In this first phase,
3904  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3905  * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3906  * PRE_DATA items other than ACLs.) Entries we can't process now are
3907  * added to the pending_list for later phases to deal with.
3908  */
3909 static void
3911 {
3912  bool skipped_some;
3913  TocEntry *next_work_item;
3914 
3915  pg_log_debug("entering restore_toc_entries_prefork");
3916 
3917  /* Adjust dependency information */
3918  fix_dependencies(AH);
3919 
3920  /*
3921  * Do all the early stuff in a single connection in the parent. There's no
3922  * great point in running it in parallel, in fact it will actually run
3923  * faster in a single connection because we avoid all the connection and
3924  * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3925  * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3926  * not risk trying to process them out-of-order.
3927  *
3928  * Stuff that we can't do immediately gets added to the pending_list.
3929  * Note: we don't yet filter out entries that aren't going to be restored.
3930  * They might participate in dependency chains connecting entries that
3931  * should be restored, so we treat them as live until we actually process
3932  * them.
3933  *
3934  * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3935  * before DATA items, and all DATA items before POST_DATA items. That is
3936  * not certain to be true in older archives, though, and in any case use
3937  * of a list file would destroy that ordering (cf. SortTocFromFile). So
3938  * this loop cannot assume that it holds.
3939  */
3941  skipped_some = false;
3942  for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3943  {
3944  bool do_now = true;
3945 
3946  if (next_work_item->section != SECTION_PRE_DATA)
3947  {
3948  /* DATA and POST_DATA items are just ignored for now */
3949  if (next_work_item->section == SECTION_DATA ||
3950  next_work_item->section == SECTION_POST_DATA)
3951  {
3952  do_now = false;
3953  skipped_some = true;
3954  }
3955  else
3956  {
3957  /*
3958  * SECTION_NONE items, such as comments, can be processed now
3959  * if we are still in the PRE_DATA part of the archive. Once
3960  * we've skipped any items, we have to consider whether the
3961  * comment's dependencies are satisfied, so skip it for now.
3962  */
3963  if (skipped_some)
3964  do_now = false;
3965  }
3966  }
3967 
3968  /*
3969  * Also skip items that need to be forced into later passes. We need
3970  * not set skipped_some in this case, since by assumption no main-pass
3971  * items could depend on these.
3972  */
3973  if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3974  do_now = false;
3975 
3976  if (do_now)
3977  {
3978  /* OK, restore the item and update its dependencies */
3979  pg_log_info("processing item %d %s %s",
3980  next_work_item->dumpId,
3981  next_work_item->desc, next_work_item->tag);
3982 
3983  (void) restore_toc_entry(AH, next_work_item, false);
3984 
3985  /* Reduce dependencies, but don't move anything to ready_heap */
3986  reduce_dependencies(AH, next_work_item, NULL);
3987  }
3988  else
3989  {
3990  /* Nope, so add it to pending_list */
3991  pending_list_append(pending_list, next_work_item);
3992  }
3993  }
3994 
3995  /*
3996  * Now close parent connection in prep for parallel steps. We do this
3997  * mainly to ensure that we don't exceed the specified number of parallel
3998  * connections.
3999  */
4000  DisconnectDatabase(&AH->public);
4001 
4002  /* blow away any transient state from the old connection */
4003  free(AH->currUser);
4004  AH->currUser = NULL;
4005  free(AH->currSchema);
4006  AH->currSchema = NULL;
4007  free(AH->currTablespace);
4008  AH->currTablespace = NULL;
4009  free(AH->currTableAm);
4010  AH->currTableAm = NULL;
4011 }
4012 
4013 /*
4014  * Main engine for parallel restore.
4015  *
4016  * Parallel restore is done in three phases. In this second phase,
4017  * we process entries by dispatching them to parallel worker children
4018  * (processes on Unix, threads on Windows), each of which connects
4019  * separately to the database. Inter-entry dependencies are respected,
4020  * and so is the RestorePass multi-pass structure. When we can no longer
4021  * make any entries ready to process, we exit. Normally, there will be
4022  * nothing left to do; but if there is, the third phase will mop up.
4023  */
4024 static void
4026  TocEntry *pending_list)
4027 {
4028  binaryheap *ready_heap;
4029  TocEntry *next_work_item;
4030 
4031  pg_log_debug("entering restore_toc_entries_parallel");
4032 
4033  /* Set up ready_heap with enough room for all known TocEntrys */
4034  ready_heap = binaryheap_allocate(AH->tocCount,
4036  NULL);
4037 
4038  /*
4039  * The pending_list contains all items that we need to restore. Move all
4040  * items that are available to process immediately into the ready_heap.
4041  * After this setup, the pending list is everything that needs to be done
4042  * but is blocked by one or more dependencies, while the ready heap
4043  * contains items that have no remaining dependencies and are OK to
4044  * process in the current restore pass.
4045  */
4047  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4048 
4049  /*
4050  * main parent loop
4051  *
4052  * Keep going until there is no worker still running AND there is no work
4053  * left to be done. Note invariant: at top of loop, there should always
4054  * be at least one worker available to dispatch a job to.
4055  */
4056  pg_log_info("entering main parallel loop");
4057 
4058  for (;;)
4059  {
4060  /* Look for an item ready to be dispatched to a worker */
4061  next_work_item = pop_next_work_item(ready_heap, pstate);
4062  if (next_work_item != NULL)
4063  {
4064  /* If not to be restored, don't waste time launching a worker */
4065  if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4066  {
4067  pg_log_info("skipping item %d %s %s",
4068  next_work_item->dumpId,
4069  next_work_item->desc, next_work_item->tag);
4070  /* Update its dependencies as though we'd completed it */
4071  reduce_dependencies(AH, next_work_item, ready_heap);
4072  /* Loop around to see if anything else can be dispatched */
4073  continue;
4074  }
4075 
4076  pg_log_info("launching item %d %s %s",
4077  next_work_item->dumpId,
4078  next_work_item->desc, next_work_item->tag);
4079 
4080  /* Dispatch to some worker */
4081  DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4082  mark_restore_job_done, ready_heap);
4083  }
4084  else if (IsEveryWorkerIdle(pstate))
4085  {
4086  /*
4087  * Nothing is ready and no worker is running, so we're done with
4088  * the current pass or maybe with the whole process.
4089  */
4090  if (AH->restorePass == RESTORE_PASS_LAST)
4091  break; /* No more parallel processing is possible */
4092 
4093  /* Advance to next restore pass */
4094  AH->restorePass++;
4095  /* That probably allows some stuff to be made ready */
4096  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4097  /* Loop around to see if anything's now ready */
4098  continue;
4099  }
4100  else
4101  {
4102  /*
4103  * We have nothing ready, but at least one child is working, so
4104  * wait for some subjob to finish.
4105  */
4106  }
4107 
4108  /*
4109  * Before dispatching another job, check to see if anything has
4110  * finished. We should check every time through the loop so as to
4111  * reduce dependencies as soon as possible. If we were unable to
4112  * dispatch any job this time through, wait until some worker finishes
4113  * (and, hopefully, unblocks some pending item). If we did dispatch
4114  * something, continue as soon as there's at least one idle worker.
4115  * Note that in either case, there's guaranteed to be at least one
4116  * idle worker when we return to the top of the loop. This ensures we
4117  * won't block inside DispatchJobForTocEntry, which would be
4118  * undesirable: we'd rather postpone dispatching until we see what's
4119  * been unblocked by finished jobs.
4120  */
4121  WaitForWorkers(AH, pstate,
4122  next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4123  }
4124 
4125  /* There should now be nothing in ready_heap. */
4126  Assert(binaryheap_empty(ready_heap));
4127 
4128  binaryheap_free(ready_heap);
4129 
4130  pg_log_info("finished main parallel loop");
4131 }
4132 
4133 /*
4134  * Main engine for parallel restore.
4135  *
4136  * Parallel restore is done in three phases. In this third phase,
4137  * we mop up any remaining TOC entries by processing them serially.
4138  * This phase normally should have nothing to do, but if we've somehow
4139  * gotten stuck due to circular dependencies or some such, this provides
4140  * at least some chance of completing the restore successfully.
4141  */
4142 static void
4144 {
4145  RestoreOptions *ropt = AH->public.ropt;
4146  TocEntry *te;
4147 
4148  pg_log_debug("entering restore_toc_entries_postfork");
4149 
4150  /*
4151  * Now reconnect the single parent connection.
4152  */
4153  ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4154 
4155  /* re-establish fixed state */
4157 
4158  /*
4159  * Make sure there is no work left due to, say, circular dependencies, or
4160  * some other pathological condition. If so, do it in the single parent
4161  * connection. We don't sweat about RestorePass ordering; it's likely we
4162  * already violated that.
4163  */
4164  for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4165  {
4166  pg_log_info("processing missed item %d %s %s",
4167  te->dumpId, te->desc, te->tag);
4168  (void) restore_toc_entry(AH, te, false);
4169  }
4170 }
4171 
4172 /*
4173  * Check if te1 has an exclusive lock requirement for an item that te2 also
4174  * requires, whether or not te2's requirement is for an exclusive lock.
4175  */
4176 static bool
4178 {
4179  int j,
4180  k;
4181 
4182  for (j = 0; j < te1->nLockDeps; j++)
4183  {
4184  for (k = 0; k < te2->nDeps; k++)
4185  {
4186  if (te1->lockDeps[j] == te2->dependencies[k])
4187  return true;
4188  }
4189  }
4190  return false;
4191 }
4192 
4193 
4194 /*
4195  * Initialize the header of the pending-items list.
4196  *
4197  * This is a circular list with a dummy TocEntry as header, just like the
4198  * main TOC list; but we use separate list links so that an entry can be in
4199  * the main TOC list as well as in the pending list.
4200  */
4201 static void
4203 {
4204  l->pending_prev = l->pending_next = l;
4205 }
4206 
4207 /* Append te to the end of the pending-list headed by l */
4208 static void
4210 {
4211  te->pending_prev = l->pending_prev;
4212  l->pending_prev->pending_next = te;
4213  l->pending_prev = te;
4214  te->pending_next = l;
4215 }
4216 
4217 /* Remove te from the pending-list */
4218 static void
4220 {
4223  te->pending_prev = NULL;
4224  te->pending_next = NULL;
4225 }
4226 
4227 
4228 /* qsort comparator for sorting TocEntries by dataLength */
4229 static int
4230 TocEntrySizeCompareQsort(const void *p1, const void *p2)
4231 {
4232  const TocEntry *te1 = *(const TocEntry *const *) p1;
4233  const TocEntry *te2 = *(const TocEntry *const *) p2;
4234 
4235  /* Sort by decreasing dataLength */
4236  if (te1->dataLength > te2->dataLength)
4237  return -1;
4238  if (te1->dataLength < te2->dataLength)
4239  return 1;
4240 
4241  /* For equal dataLengths, sort by dumpId, just to be stable */
4242  if (te1->dumpId < te2->dumpId)
4243  return -1;
4244  if (te1->dumpId > te2->dumpId)
4245  return 1;
4246 
4247  return 0;
4248 }
4249 
4250 /* binaryheap comparator for sorting TocEntries by dataLength */
4251 static int
4252 TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4253 {
4254  /* return opposite of qsort comparator for max-heap */
4255  return -TocEntrySizeCompareQsort(&p1, &p2);
4256 }
4257 
4258 
4259 /*
4260  * Move all immediately-ready items from pending_list to ready_heap.
4261  *
4262  * Items are considered ready if they have no remaining dependencies and
4263  * they belong in the current restore pass. (See also reduce_dependencies,
4264  * which applies the same logic one-at-a-time.)
4265  */
4266 static void
4268  binaryheap *ready_heap,
4269  RestorePass pass)
4270 {
4271  TocEntry *te;
4272  TocEntry *next_te;
4273 
4274  for (te = pending_list->pending_next; te != pending_list; te = next_te)
4275  {
4276  /* must save list link before possibly removing te from list */
4277  next_te = te->pending_next;
4278 
4279  if (te->depCount == 0 &&
4280  _tocEntryRestorePass(te) == pass)
4281  {
4282  /* Remove it from pending_list ... */
4283  pending_list_remove(te);
4284  /* ... and add to ready_heap */
4285  binaryheap_add(ready_heap, te);
4286  }
4287  }
4288 }
4289 
4290 /*
4291  * Find the next work item (if any) that is capable of being run now,
4292  * and remove it from the ready_heap.
4293  *
4294  * Returns the item, or NULL if nothing is runnable.
4295  *
4296  * To qualify, the item must have no remaining dependencies
4297  * and no requirements for locks that are incompatible with
4298  * items currently running. Items in the ready_heap are known to have
4299  * no remaining dependencies, but we have to check for lock conflicts.
4300  */
4301 static TocEntry *
4303  ParallelState *pstate)
4304 {
4305  /*
4306  * Search the ready_heap until we find a suitable item. Note that we do a
4307  * sequential scan through the heap nodes, so even though we will first
4308  * try to choose the highest-priority item, we might end up picking
4309  * something with a much lower priority. However, we expect that we will
4310  * typically be able to pick one of the first few items, which should
4311  * usually have a relatively high priority.
4312  */
4313  for (int i = 0; i < binaryheap_size(ready_heap); i++)
4314  {
4315  TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4316  bool conflicts = false;
4317 
4318  /*
4319  * Check to see if the item would need exclusive lock on something
4320  * that a currently running item also needs lock on, or vice versa. If
4321  * so, we don't want to schedule them together.
4322  */
4323  for (int k = 0; k < pstate->numWorkers; k++)
4324  {
4325  TocEntry *running_te = pstate->te[k];
4326 
4327  if (running_te == NULL)
4328  continue;
4329  if (has_lock_conflicts(te, running_te) ||
4330  has_lock_conflicts(running_te, te))
4331  {
4332  conflicts = true;
4333  break;
4334  }
4335  }
4336 
4337  if (conflicts)
4338  continue;
4339 
4340  /* passed all tests, so this item can run */
4341  binaryheap_remove_node(ready_heap, i);
4342  return te;
4343  }
4344 
4345  pg_log_debug("no item ready");
4346  return NULL;
4347 }
4348 
4349 
4350 /*
4351  * Restore a single TOC item in parallel with others
4352  *
4353  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4354  * (everything else). A worker process executes several such work items during
4355  * a parallel backup or restore. Once we terminate here and report back that
4356  * our work is finished, the leader process will assign us a new work item.
4357  */
4358 int
4360 {
4361  int status;
4362 
4363  Assert(AH->connection != NULL);
4364 
4365  /* Count only errors associated with this TOC entry */
4366  AH->public.n_errors = 0;
4367 
4368  /* Restore the TOC item */
4369  status = restore_toc_entry(AH, te, true);
4370 
4371  return status;
4372 }
4373 
4374 
4375 /*
4376  * Callback function that's invoked in the leader process after a step has
4377  * been parallel restored.
4378  *
4379  * Update status and reduce the dependency count of any dependent items.
4380  */
4381 static void
4383  TocEntry *te,
4384  int status,
4385  void *callback_data)
4386 {
4387  binaryheap *ready_heap = (binaryheap *) callback_data;
4388 
4389  pg_log_info("finished item %d %s %s",
4390  te->dumpId, te->desc, te->tag);
4391 
4392  if (status == WORKER_CREATE_DONE)
4393  mark_create_done(AH, te);
4394  else if (status == WORKER_INHIBIT_DATA)
4395  {
4397  AH->public.n_errors++;
4398  }
4399  else if (status == WORKER_IGNORED_ERRORS)
4400  AH->public.n_errors++;
4401  else if (status != 0)
4402  pg_fatal("worker process failed: exit code %d",
4403  status);
4404 
4405  reduce_dependencies(AH, te, ready_heap);
4406 }
4407 
4408 
4409 /*
4410  * Process the dependency information into a form useful for parallel restore.
4411  *
4412  * This function takes care of fixing up some missing or badly designed
4413  * dependencies, and then prepares subsidiary data structures that will be
4414  * used in the main parallel-restore logic, including:
4415  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4416  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4417  * dependencies for each TOC entry.
4418  *
4419  * We also identify locking dependencies so that we can avoid trying to
4420  * schedule conflicting items at the same time.
4421  */
4422 static void
4424 {
4425  TocEntry *te;
4426  int i;
4427 
4428  /*
4429  * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4430  * items are marked as not being in any parallel-processing list.
4431  */
4432  for (te = AH->toc->next; te != AH->toc; te = te->next)
4433  {
4434  te->depCount = te->nDeps;
4435  te->revDeps = NULL;
4436  te->nRevDeps = 0;
4437  te->pending_prev = NULL;
4438  te->pending_next = NULL;
4439  }
4440 
4441  /*
4442  * POST_DATA items that are shown as depending on a table need to be
4443  * re-pointed to depend on that table's data, instead. This ensures they
4444  * won't get scheduled until the data has been loaded.
4445  */
4447 
4448  /*
4449  * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4450  * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4451  * one BLOB COMMENTS in such files.)
4452  */
4453  if (AH->version < K_VERS_1_11)
4454  {
4455  for (te = AH->toc->next; te != AH->toc; te = te->next)
4456  {
4457  if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4458  {
4459  TocEntry *te2;
4460 
4461  for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4462  {
4463  if (strcmp(te2->desc, "BLOBS") == 0)
4464  {
4465  te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4466  te->dependencies[0] = te2->dumpId;
4467  te->nDeps++;
4468  te->depCount++;
4469  break;
4470  }
4471  }
4472  break;
4473  }
4474  }
4475  }
4476 
4477  /*
4478  * At this point we start to build the revDeps reverse-dependency arrays,
4479  * so all changes of dependencies must be complete.
4480  */
4481 
4482  /*
4483  * Count the incoming dependencies for each item. Also, it is possible
4484  * that the dependencies list items that are not in the archive at all
4485  * (that should not happen in 9.2 and later, but is highly likely in older
4486  * archives). Subtract such items from the depCounts.
4487  */
4488  for (te = AH->toc->next; te != AH->toc; te = te->next)
4489  {
4490  for (i = 0; i < te->nDeps; i++)
4491  {
4492  DumpId depid = te->dependencies[i];
4493 
4494  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4495  AH->tocsByDumpId[depid]->nRevDeps++;
4496  else
4497  te->depCount--;
4498  }
4499  }
4500 
4501  /*
4502  * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4503  * it as a counter below.
4504  */
4505  for (te = AH->toc->next; te != AH->toc; te = te->next)
4506  {
4507  if (te->nRevDeps > 0)
4508  te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4509  te->nRevDeps = 0;
4510  }
4511 
4512  /*
4513  * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4514  * better agree with the loops above.
4515  */
4516  for (te = AH->toc->next; te != AH->toc; te = te->next)
4517  {
4518  for (i = 0; i < te->nDeps; i++)
4519  {
4520  DumpId depid = te->dependencies[i];
4521 
4522  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4523  {
4524  TocEntry *otherte = AH->tocsByDumpId[depid];
4525 
4526  otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4527  }
4528  }
4529  }
4530 
4531  /*
4532  * Lastly, work out the locking dependencies.
4533  */
4534  for (te = AH->toc->next; te != AH->toc; te = te->next)
4535  {
4536  te->lockDeps = NULL;
4537  te->nLockDeps = 0;
4539  }
4540 }
4541 
4542 /*
4543  * Change dependencies on table items to depend on table data items instead,
4544  * but only in POST_DATA items.
4545  *
4546  * Also, for any item having such dependency(s), set its dataLength to the
4547  * largest dataLength of the table data items it depends on. This ensures
4548  * that parallel restore will prioritize larger jobs (index builds, FK
4549  * constraint checks, etc) over smaller ones, avoiding situations where we
4550  * end a restore with only one active job working on a large table.
4551  */
4552 static void
4554 {
4555  TocEntry *te;
4556  int i;
4557  DumpId olddep;
4558 
4559  for (te = AH->toc->next; te != AH->toc; te = te->next)
4560  {
4561  if (te->section != SECTION_POST_DATA)
4562  continue;
4563  for (i = 0; i < te->nDeps; i++)
4564  {
4565  olddep = te->dependencies[i];
4566  if (olddep <= AH->maxDumpId &&
4567  AH->tableDataId[olddep] != 0)
4568  {
4569  DumpId tabledataid = AH->tableDataId[olddep];
4570  TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4571 
4572  te->dependencies[i] = tabledataid;
4573  te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4574  pg_log_debug("transferring dependency %d -> %d to %d",
4575  te->dumpId, olddep, tabledataid);
4576  }
4577  }
4578  }
4579 }
4580 
4581 /*
4582  * Identify which objects we'll need exclusive lock on in order to restore
4583  * the given TOC entry (*other* than the one identified by the TOC entry
4584  * itself). Record their dump IDs in the entry's lockDeps[] array.
4585  */
4586 static void
4588 {
4589  DumpId *lockids;
4590  int nlockids;
4591  int i;
4592 
4593  /*
4594  * We only care about this for POST_DATA items. PRE_DATA items are not
4595  * run in parallel, and DATA items are all independent by assumption.
4596  */
4597  if (te->section != SECTION_POST_DATA)
4598  return;
4599 
4600  /* Quick exit if no dependencies at all */
4601  if (te->nDeps == 0)
4602  return;
4603 
4604  /*
4605  * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4606  * and hence require exclusive lock. However, we know that CREATE INDEX
4607  * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4608  * category too ... but today is not that day.)
4609  */
4610  if (strcmp(te->desc, "INDEX") == 0)
4611  return;
4612 
4613  /*
4614  * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4615  * item listed among its dependencies. Originally all of these would have
4616  * been TABLE items, but repoint_table_dependencies would have repointed
4617  * them to the TABLE DATA items if those are present (which they might not
4618  * be, eg in a schema-only dump). Note that all of the entries we are
4619  * processing here are POST_DATA; otherwise there might be a significant
4620  * difference between a dependency on a table and a dependency on its
4621  * data, so that closer analysis would be needed here.
4622  */
4623  lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4624  nlockids = 0;
4625  for (i = 0; i < te->nDeps; i++)
4626  {
4627  DumpId depid = te->dependencies[i];
4628 
4629  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4630  ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4631  strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4632  lockids[nlockids++] = depid;
4633  }
4634 
4635  if (nlockids == 0)
4636  {
4637  free(lockids);
4638  return;
4639  }
4640 
4641  te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4642  te->nLockDeps = nlockids;
4643 }
4644 
4645 /*
4646  * Remove the specified TOC entry from the depCounts of items that depend on
4647  * it, thereby possibly making them ready-to-run. Any pending item that
4648  * becomes ready should be moved to the ready_heap, if that's provided.
4649  */
4650 static void
4652  binaryheap *ready_heap)
4653 {
4654  int i;
4655 
4656  pg_log_debug("reducing dependencies for %d", te->dumpId);
4657 
4658  for (i = 0; i < te->nRevDeps; i++)
4659  {
4660  TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4661 
4662  Assert(otherte->depCount > 0);
4663  otherte->depCount--;
4664 
4665  /*
4666  * It's ready if it has no remaining dependencies, and it belongs in
4667  * the current restore pass, and it is currently a member of the
4668  * pending list (that check is needed to prevent double restore in
4669  * some cases where a list-file forces out-of-order restoring).
4670  * However, if ready_heap == NULL then caller doesn't want any list
4671  * memberships changed.
4672  */
4673  if (otherte->depCount == 0 &&
4674  _tocEntryRestorePass(otherte) == AH->restorePass &&
4675  otherte->pending_prev != NULL &&
4676  ready_heap != NULL)
4677  {
4678  /* Remove it from pending list ... */
4679  pending_list_remove(otherte);
4680  /* ... and add to ready_heap */
4681  binaryheap_add(ready_heap, otherte);
4682  }
4683  }
4684 }
4685 
4686 /*
4687  * Set the created flag on the DATA member corresponding to the given
4688  * TABLE member
4689  */
4690 static void
4692 {
4693  if (AH->tableDataId[te->dumpId] != 0)
4694  {
4695  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4696 
4697  ted->created = true;
4698  }
4699 }
4700 
4701 /*
4702  * Mark the DATA member corresponding to the given TABLE member
4703  * as not wanted
4704  */
4705 static void
4707 {
4708  pg_log_info("table \"%s\" could not be created, will not restore its data",
4709  te->tag);
4710 
4711  if (AH->tableDataId[te->dumpId] != 0)
4712  {
4713  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4714 
4715  ted->reqs = 0;
4716  }
4717 }
4718 
4719 /*
4720  * Clone and de-clone routines used in parallel restoration.
4721  *
4722  * Enough of the structure is cloned to ensure that there is no
4723  * conflict between different threads each with their own clone.
4724  */
4725 ArchiveHandle *
4727 {
4728  ArchiveHandle *clone;
4729 
4730  /* Make a "flat" copy */
4731  clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4732  memcpy(clone, AH, sizeof(ArchiveHandle));
4733 
4734  /* Handle format-independent fields */
4735  memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4736 
4737  /* The clone will have its own connection, so disregard connection state */
4738  clone->connection = NULL;
4739  clone->connCancel = NULL;
4740  clone->currUser = NULL;
4741  clone->currSchema = NULL;
4742  clone->currTableAm = NULL;
4743  clone->currTablespace = NULL;
4744 
4745  /* savedPassword must be local in case we change it while connecting */
4746  if (clone->savedPassword)
4747  clone->savedPassword = pg_strdup(clone->savedPassword);
4748 
4749  /* clone has its own error count, too */
4750  clone->public.n_errors = 0;
4751 
4752  /*
4753  * Connect our new clone object to the database, using the same connection
4754  * parameters used for the original connection.
4755  */
4756  ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4757 
4758  /* re-establish fixed state */
4759  if (AH->mode == archModeRead)
4760  _doSetFixedOutputState(clone);
4761  /* in write case, setupDumpWorker will fix up connection state */
4762 
4763  /* Let the format-specific code have a chance too */
4764  clone->ClonePtr(clone);
4765 
4766  Assert(clone->connection != NULL);
4767  return clone;
4768 }
4769 
4770 /*
4771  * Release clone-local storage.
4772  *
4773  * Note: we assume any clone-local connection was already closed.
4774  */
4775 void
4777 {
4778  /* Should not have an open database connection */
4779  Assert(AH->connection == NULL);
4780 
4781  /* Clear format-specific state */
4782  AH->DeClonePtr(AH);
4783 
4784  /* Clear state allocated by CloneArchive */
4785  if (AH->sqlparse.curCmd)
4787 
4788  /* Clear any connection-local state */
4789  free(AH->currUser);
4790  free(AH->currSchema);
4791  free(AH->currTablespace);
4792  free(AH->currTableAm);
4793  free(AH->savedPassword);
4794 
4795  free(AH);
4796 }
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1059
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1451
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1205
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1268
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:897
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition: binaryheap.c:225
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:154
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
#define binaryheap_size(h)
Definition: binaryheap.h:66
#define binaryheap_empty(h)
Definition: binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition: binaryheap.h:67
#define PG_BINARY_R
Definition: c.h:1262
#define ngettext(s, p, n)
Definition: c.h:1168
#define PG_BINARY_A
Definition: c.h:1261
#define Max(x, y)
Definition: c.h:985
#define PG_BINARY_W
Definition: c.h:1263
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:289
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:88
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
Definition: compress_io.c:195
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition: compression.c:69
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:1073
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6687
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6841
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2224
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:474
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
DataDirSyncMethod
Definition: file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition: file_utils.h:29
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:667
char sign
Definition: informix.c:668
static DataDirSyncMethod sync_method
Definition: initdb.c:168
int b
Definition: isn.c:70
return true
Definition: isn.c:126
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
#define INV_WRITE
Definition: libpq-fs.h:21
static void const char * fmt
va_end(args)
Assert(fmt[strlen(fmt) - 1] !='\n')
va_start(args, fmt)
static struct pg_tm tm
Definition: localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:216
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_PRIMARY
Definition: logging.h:67
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:133
static AmcheckOptions opts
Definition: pg_amcheck.c:111
void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:110
@ SECTION_NONE
Definition: pg_backup.h:57
@ SECTION_POST_DATA
Definition: pg_backup.h:60
@ SECTION_PRE_DATA
Definition: pg_backup.h:58
@ SECTION_DATA
Definition: pg_backup.h:59
int DumpId
Definition: pg_backup.h:268
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:278
enum _archiveFormat ArchiveFormat
@ archModeWrite
Definition: pg_backup.h:51
@ archModeAppend
Definition: pg_backup.h:50
@ archModeRead
Definition: pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:225
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:41
@ archTar
Definition: pg_backup.h:43
@ archCustom
Definition: pg_backup.h:42
@ archDirectory
Definition: pg_backup.h:45
@ archNull
Definition: pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
static void _becomeUser(ArchiveHandle *AH, const char *user)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
RestoreOptions * NewRestoreOptions(void)
static RestorePass _tocEntryRestorePass(TocEntry *te)
int StartLO(Archive *AHX, Oid oid)
static void setupRestoreWorker(Archive *AHX)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
void StartRestoreLOs(ArchiveHandle *AH)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
char * ReadStr(ArchiveHandle *AH)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
DumpOptions * NewDumpOptions(void)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
static char * sanitize_line(const char *str, bool want_hyphen)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void RestoreArchive(Archive *AHX)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
Definition: pg_backup_db.c:545
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
Definition: pg_backup_db.c:74
#define ARCHIVE_REV(version)
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
@ ACT_DUMP
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
@ RESTORE_PASS_POST_ACL
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
Definition: pg_backup_db.c:500
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
Definition: pg_backup_db.c:445
void * arg
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition: pg_ctl.c:79
int32 encoding
Definition: pg_database.h:41
static bool dosync
Definition: pg_dump.c:103
static void setupDumpWorker(Archive *AH)
Definition: pg_dump.c:1330
#define exit_nicely(code)
Definition: pg_dumpall.c:126
static char * filename
Definition: pg_dumpall.c:121
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition: pg_get_line.c:95
static char * user
Definition: pg_regress.c:120
static char * buf
Definition: pg_test_fsync.c:73
#define pg_encoding_to_char
Definition: pg_wchar.h:554
#define pg_char_to_encoding
Definition: pg_wchar.h:553
char * tablespace
Definition: pgbench.c:216
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)
Definition: port.h:449
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
char * c
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition: psprintf.c:106
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
const char * simple_string_list_not_touched(SimpleStringList *list)
Definition: simple_list.c:144
static pg_noinline void Size size
Definition: slab.c:607
char * dbname
Definition: streamutil.c:51
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
Definition: string_utils.c:590
const char * fmtId(const char *rawid)
Definition: string_utils.c:64
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
int minRemoteVersion
Definition: pg_backup.h:220
char * remoteVersionStr
Definition: pg_backup.h:216
DumpOptions * dopt
Definition: pg_backup.h:212
bool exit_on_error
Definition: pg_backup.h:235
char * searchpath
Definition: pg_backup.h:231
int maxRemoteVersion
Definition: pg_backup.h:221
int n_errors
Definition: pg_backup.h:236
bool std_strings
Definition: pg_backup.h:228
int numWorkers
Definition: pg_backup.h:223
int encoding
Definition: pg_backup.h:227
int verbose
Definition: pg_backup.h:215
RestoreOptions * ropt
Definition: pg_backup.h:213
Oid tableoid
Definition: pg_backup.h:264
bool(* write_func)(const void *ptr, size_t size, struct CompressFileHandle *CFH)
Definition: compress_io.h:139
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
TocEntry ** te
Definition: parallel.h:59
int numWorkers
Definition: parallel.h:57
SimpleStringListCell * head
Definition: simple_list.h:42
ArchiverStage stage
RestorePass restorePass
ArchiveFormat format
struct _tocEntry * toc