PostgreSQL Source Code  git master
pg_backup_archiver.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_backup_archiver.c
4  *
5  * Private implementation of the archiver routines.
6  *
7  * See the headers to pg_restore for more details.
8  *
9  * Copyright (c) 2000, Philip Warner
10  * Rights are granted to use this software in any way so long
11  * as this notice is not removed.
12  *
13  * The author is not responsible for loss or damages that may
14  * result from its use.
15  *
16  *
17  * IDENTIFICATION
18  * src/bin/pg_dump/pg_backup_archiver.c
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres_fe.h"
23 
24 #include <ctype.h>
25 #include <fcntl.h>
26 #include <unistd.h>
27 #include <sys/stat.h>
28 #include <sys/wait.h>
29 #ifdef WIN32
30 #include <io.h>
31 #endif
32 
33 #include "catalog/pg_class_d.h"
34 #include "common/string.h"
35 #include "compress_io.h"
36 #include "dumputils.h"
37 #include "fe_utils/string_utils.h"
38 #include "lib/binaryheap.h"
39 #include "lib/stringinfo.h"
40 #include "libpq/libpq-fs.h"
41 #include "parallel.h"
42 #include "pg_backup_archiver.h"
43 #include "pg_backup_db.h"
44 #include "pg_backup_utils.h"
45 
46 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 
49 
50 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
51  const pg_compress_specification compression_spec,
52  bool dosync, ArchiveMode mode,
53  SetupWorkerPtrType setupWorkerPtr,
55 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
56 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
57 static char *sanitize_line(const char *str, bool want_hyphen);
58 static void _doSetFixedOutputState(ArchiveHandle *AH);
59 static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
60 static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
61 static void _becomeUser(ArchiveHandle *AH, const char *user);
62 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
63 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
64 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
65 static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
67  TocEntry *te);
68 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
69 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
70 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
71 static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
73 static bool _tocEntryIsACL(TocEntry *te);
76 static bool is_load_via_partition_root(TocEntry *te);
77 static void buildTocEntryArrays(ArchiveHandle *AH);
78 static void _moveBefore(TocEntry *pos, TocEntry *te);
80 
81 static int RestoringToDB(ArchiveHandle *AH);
82 static void dump_lo_buf(ArchiveHandle *AH);
83 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
84 static void SetOutput(ArchiveHandle *AH, const char *filename,
85  const pg_compress_specification compression_spec);
87 static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
88 
89 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
91  TocEntry *pending_list);
93  ParallelState *pstate,
94  TocEntry *pending_list);
96  TocEntry *pending_list);
97 static void pending_list_header_init(TocEntry *l);
98 static void pending_list_append(TocEntry *l, TocEntry *te);
99 static void pending_list_remove(TocEntry *te);
100 static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
101 static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
102 static void move_to_ready_heap(TocEntry *pending_list,
103  binaryheap *ready_heap,
104  RestorePass pass);
105 static TocEntry *pop_next_work_item(binaryheap *ready_heap,
106  ParallelState *pstate);
107 static void mark_dump_job_done(ArchiveHandle *AH,
108  TocEntry *te,
109  int status,
110  void *callback_data);
111 static void mark_restore_job_done(ArchiveHandle *AH,
112  TocEntry *te,
113  int status,
114  void *callback_data);
115 static void fix_dependencies(ArchiveHandle *AH);
116 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
119 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
120  binaryheap *ready_heap);
121 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
123 
124 static void StrictNamesCheck(RestoreOptions *ropt);
125 
126 
127 /*
128  * Allocate a new DumpOptions block containing all default values.
129  */
130 DumpOptions *
132 {
134 
136  return opts;
137 }
138 
139 /*
140  * Initialize a DumpOptions struct to all default values
141  */
142 void
144 {
145  memset(opts, 0, sizeof(DumpOptions));
146  /* set any fields that shouldn't default to zeroes */
147  opts->include_everything = true;
148  opts->cparams.promptPassword = TRI_DEFAULT;
149  opts->dumpSections = DUMP_UNSECTIONED;
150 }
151 
152 /*
153  * Create a freshly allocated DumpOptions with options equivalent to those
154  * found in the given RestoreOptions.
155  */
156 DumpOptions *
158 {
159  DumpOptions *dopt = NewDumpOptions();
160 
161  /* this is the inverse of what's at the end of pg_dump.c's main() */
162  dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
163  dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
164  dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
165  dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
167  dopt->outputClean = ropt->dropSchema;
168  dopt->dataOnly = ropt->dataOnly;
169  dopt->schemaOnly = ropt->schemaOnly;
170  dopt->if_exists = ropt->if_exists;
171  dopt->column_inserts = ropt->column_inserts;
172  dopt->dumpSections = ropt->dumpSections;
173  dopt->aclsSkip = ropt->aclsSkip;
174  dopt->outputSuperuser = ropt->superuser;
175  dopt->outputCreateDB = ropt->createDB;
176  dopt->outputNoOwner = ropt->noOwner;
177  dopt->outputNoTableAm = ropt->noTableAm;
178  dopt->outputNoTablespaces = ropt->noTablespace;
179  dopt->disable_triggers = ropt->disable_triggers;
180  dopt->use_setsessauth = ropt->use_setsessauth;
182  dopt->dump_inserts = ropt->dump_inserts;
183  dopt->no_comments = ropt->no_comments;
184  dopt->no_publications = ropt->no_publications;
186  dopt->no_subscriptions = ropt->no_subscriptions;
187  dopt->lockWaitTimeout = ropt->lockWaitTimeout;
190  dopt->sequence_data = ropt->sequence_data;
191 
192  return dopt;
193 }
194 
195 
196 /*
197  * Wrapper functions.
198  *
199  * The objective is to make writing new formats and dumpers as simple
200  * as possible, if necessary at the expense of extra function calls etc.
201  *
202  */
203 
204 /*
205  * The dump worker setup needs lots of knowledge of the internals of pg_dump,
206  * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
207  * setup doesn't need to know anything much, so it's defined here.
208  */
209 static void
211 {
212  ArchiveHandle *AH = (ArchiveHandle *) AHX;
213 
214  AH->ReopenPtr(AH);
215 }
216 
217 
218 /* Create a new archive */
219 /* Public */
220 Archive *
221 CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
222  const pg_compress_specification compression_spec,
223  bool dosync, ArchiveMode mode,
226 
227 {
228  ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
230 
231  return (Archive *) AH;
232 }
233 
234 /* Open an existing archive */
235 /* Public */
236 Archive *
237 OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
238 {
239  ArchiveHandle *AH;
240  pg_compress_specification compression_spec = {0};
241 
242  compression_spec.algorithm = PG_COMPRESSION_NONE;
243  AH = _allocAH(FileSpec, fmt, compression_spec, true,
246 
247  return (Archive *) AH;
248 }
249 
250 /* Public */
251 void
253 {
254  ArchiveHandle *AH = (ArchiveHandle *) AHX;
255 
256  AH->ClosePtr(AH);
257 
258  /* Close the output */
259  errno = 0;
260  if (!EndCompressFileHandle(AH->OF))
261  pg_fatal("could not close output file: %m");
262 }
263 
264 /* Public */
265 void
267 {
268  /* Caller can omit dump options, in which case we synthesize them */
269  if (dopt == NULL && ropt != NULL)
270  dopt = dumpOptionsFromRestoreOptions(ropt);
271 
272  /* Save options for later access */
273  AH->dopt = dopt;
274  AH->ropt = ropt;
275 }
276 
277 /* Public */
278 void
280 {
281  ArchiveHandle *AH = (ArchiveHandle *) AHX;
282  RestoreOptions *ropt = AH->public.ropt;
283  TocEntry *te;
284  teSection curSection;
285 
286  /* Decide which TOC entries will be dumped/restored, and mark them */
287  curSection = SECTION_PRE_DATA;
288  for (te = AH->toc->next; te != AH->toc; te = te->next)
289  {
290  /*
291  * When writing an archive, we also take this opportunity to check
292  * that we have generated the entries in a sane order that respects
293  * the section divisions. When reading, don't complain, since buggy
294  * old versions of pg_dump might generate out-of-order archives.
295  */
296  if (AH->mode != archModeRead)
297  {
298  switch (te->section)
299  {
300  case SECTION_NONE:
301  /* ok to be anywhere */
302  break;
303  case SECTION_PRE_DATA:
304  if (curSection != SECTION_PRE_DATA)
305  pg_log_warning("archive items not in correct section order");
306  break;
307  case SECTION_DATA:
308  if (curSection == SECTION_POST_DATA)
309  pg_log_warning("archive items not in correct section order");
310  break;
311  case SECTION_POST_DATA:
312  /* ok no matter which section we were in */
313  break;
314  default:
315  pg_fatal("unexpected section code %d",
316  (int) te->section);
317  break;
318  }
319  }
320 
321  if (te->section != SECTION_NONE)
322  curSection = te->section;
323 
324  te->reqs = _tocEntryRequired(te, curSection, AH);
325  }
326 
327  /* Enforce strict names checking */
328  if (ropt->strict_names)
329  StrictNamesCheck(ropt);
330 }
331 
332 /* Public */
333 void
335 {
336  ArchiveHandle *AH = (ArchiveHandle *) AHX;
337  RestoreOptions *ropt = AH->public.ropt;
338  bool parallel_mode;
339  TocEntry *te;
340  CompressFileHandle *sav;
341 
343 
344  /*
345  * If we're going to do parallel restore, there are some restrictions.
346  */
347  parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
348  if (parallel_mode)
349  {
350  /* We haven't got round to making this work for all archive formats */
351  if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
352  pg_fatal("parallel restore is not supported with this archive file format");
353 
354  /* Doesn't work if the archive represents dependencies as OIDs */
355  if (AH->version < K_VERS_1_8)
356  pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
357 
358  /*
359  * It's also not gonna work if we can't reopen the input file, so
360  * let's try that immediately.
361  */
362  AH->ReopenPtr(AH);
363  }
364 
365  /*
366  * Make sure we won't need (de)compression we haven't got
367  */
368  if (AH->PrintTocDataPtr != NULL)
369  {
370  for (te = AH->toc->next; te != AH->toc; te = te->next)
371  {
372  if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
373  {
375 
376  if (errmsg)
377  pg_fatal("cannot restore from compressed archive (%s)",
378  errmsg);
379  else
380  break;
381  }
382  }
383  }
384 
385  /*
386  * Prepare index arrays, so we can assume we have them throughout restore.
387  * It's possible we already did this, though.
388  */
389  if (AH->tocsByDumpId == NULL)
391 
392  /*
393  * If we're using a DB connection, then connect it.
394  */
395  if (ropt->useDB)
396  {
397  pg_log_info("connecting to database for restore");
398  if (AH->version < K_VERS_1_3)
399  pg_fatal("direct database connections are not supported in pre-1.3 archives");
400 
401  /*
402  * We don't want to guess at whether the dump will successfully
403  * restore; allow the attempt regardless of the version of the restore
404  * target.
405  */
406  AHX->minRemoteVersion = 0;
407  AHX->maxRemoteVersion = 9999999;
408 
409  ConnectDatabase(AHX, &ropt->cparams, false);
410 
411  /*
412  * If we're talking to the DB directly, don't send comments since they
413  * obscure SQL when displaying errors
414  */
415  AH->noTocComments = 1;
416  }
417 
418  /*
419  * Work out if we have an implied data-only restore. This can happen if
420  * the dump was data only or if the user has used a toc list to exclude
421  * all of the schema data. All we do is look for schema entries - if none
422  * are found then we set the dataOnly flag.
423  *
424  * We could scan for wanted TABLE entries, but that is not the same as
425  * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
426  */
427  if (!ropt->dataOnly)
428  {
429  int impliedDataOnly = 1;
430 
431  for (te = AH->toc->next; te != AH->toc; te = te->next)
432  {
433  if ((te->reqs & REQ_SCHEMA) != 0)
434  { /* It's schema, and it's wanted */
435  impliedDataOnly = 0;
436  break;
437  }
438  }
439  if (impliedDataOnly)
440  {
441  ropt->dataOnly = impliedDataOnly;
442  pg_log_info("implied data-only restore");
443  }
444  }
445 
446  /*
447  * Setup the output file if necessary.
448  */
449  sav = SaveOutput(AH);
451  SetOutput(AH, ropt->filename, ropt->compression_spec);
452 
453  ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
454 
455  if (AH->archiveRemoteVersion)
456  ahprintf(AH, "-- Dumped from database version %s\n",
458  if (AH->archiveDumpVersion)
459  ahprintf(AH, "-- Dumped by pg_dump version %s\n",
460  AH->archiveDumpVersion);
461 
462  ahprintf(AH, "\n");
463 
464  if (AH->public.verbose)
465  dumpTimestamp(AH, "Started on", AH->createDate);
466 
467  if (ropt->single_txn)
468  {
469  if (AH->connection)
470  StartTransaction(AHX);
471  else
472  ahprintf(AH, "BEGIN;\n\n");
473  }
474 
475  /*
476  * Establish important parameter values right away.
477  */
479 
480  AH->stage = STAGE_PROCESSING;
481 
482  /*
483  * Drop the items at the start, in reverse order
484  */
485  if (ropt->dropSchema)
486  {
487  for (te = AH->toc->prev; te != AH->toc; te = te->prev)
488  {
489  AH->currentTE = te;
490 
491  /*
492  * In createDB mode, issue a DROP *only* for the database as a
493  * whole. Issuing drops against anything else would be wrong,
494  * because at this point we're connected to the wrong database.
495  * (The DATABASE PROPERTIES entry, if any, should be treated like
496  * the DATABASE entry.)
497  */
498  if (ropt->createDB)
499  {
500  if (strcmp(te->desc, "DATABASE") != 0 &&
501  strcmp(te->desc, "DATABASE PROPERTIES") != 0)
502  continue;
503  }
504 
505  /* Otherwise, drop anything that's selected and has a dropStmt */
506  if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
507  {
508  bool not_allowed_in_txn = false;
509 
510  pg_log_info("dropping %s %s", te->desc, te->tag);
511 
512  /*
513  * In --transaction-size mode, we have to temporarily exit our
514  * transaction block to drop objects that can't be dropped
515  * within a transaction.
516  */
517  if (ropt->txn_size > 0)
518  {
519  if (strcmp(te->desc, "DATABASE") == 0 ||
520  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
521  {
522  not_allowed_in_txn = true;
523  if (AH->connection)
524  CommitTransaction(AHX);
525  else
526  ahprintf(AH, "COMMIT;\n");
527  }
528  }
529 
530  /* Select owner and schema as necessary */
531  _becomeOwner(AH, te);
532  _selectOutputSchema(AH, te->namespace);
533 
534  /*
535  * Now emit the DROP command, if the object has one. Note we
536  * don't necessarily emit it verbatim; at this point we add an
537  * appropriate IF EXISTS clause, if the user requested it.
538  */
539  if (strcmp(te->desc, "BLOB METADATA") == 0)
540  {
541  /* We must generate the per-blob commands */
542  if (ropt->if_exists)
543  IssueCommandPerBlob(AH, te,
544  "SELECT pg_catalog.lo_unlink(oid) "
545  "FROM pg_catalog.pg_largeobject_metadata "
546  "WHERE oid = '", "'");
547  else
548  IssueCommandPerBlob(AH, te,
549  "SELECT pg_catalog.lo_unlink('",
550  "')");
551  }
552  else if (*te->dropStmt != '\0')
553  {
554  if (!ropt->if_exists ||
555  strncmp(te->dropStmt, "--", 2) == 0)
556  {
557  /*
558  * Without --if-exists, or if it's just a comment (as
559  * happens for the public schema), print the dropStmt
560  * as-is.
561  */
562  ahprintf(AH, "%s", te->dropStmt);
563  }
564  else
565  {
566  /*
567  * Inject an appropriate spelling of "if exists". For
568  * old-style large objects, we have a routine that
569  * knows how to do it, without depending on
570  * te->dropStmt; use that. For other objects we need
571  * to parse the command.
572  */
573  if (strcmp(te->desc, "BLOB") == 0)
574  {
575  DropLOIfExists(AH, te->catalogId.oid);
576  }
577  else
578  {
579  char *dropStmt = pg_strdup(te->dropStmt);
580  char *dropStmtOrig = dropStmt;
581  PQExpBuffer ftStmt = createPQExpBuffer();
582 
583  /*
584  * Need to inject IF EXISTS clause after ALTER
585  * TABLE part in ALTER TABLE .. DROP statement
586  */
587  if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
588  {
589  appendPQExpBufferStr(ftStmt,
590  "ALTER TABLE IF EXISTS");
591  dropStmt = dropStmt + 11;
592  }
593 
594  /*
595  * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
596  * not support the IF EXISTS clause, and therefore
597  * we simply emit the original command for DEFAULT
598  * objects (modulo the adjustment made above).
599  *
600  * Likewise, don't mess with DATABASE PROPERTIES.
601  *
602  * If we used CREATE OR REPLACE VIEW as a means of
603  * quasi-dropping an ON SELECT rule, that should
604  * be emitted unchanged as well.
605  *
606  * For other object types, we need to extract the
607  * first part of the DROP which includes the
608  * object type. Most of the time this matches
609  * te->desc, so search for that; however for the
610  * different kinds of CONSTRAINTs, we know to
611  * search for hardcoded "DROP CONSTRAINT" instead.
612  */
613  if (strcmp(te->desc, "DEFAULT") == 0 ||
614  strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
615  strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
616  appendPQExpBufferStr(ftStmt, dropStmt);
617  else
618  {
619  char buffer[40];
620  char *mark;
621 
622  if (strcmp(te->desc, "CONSTRAINT") == 0 ||
623  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
624  strcmp(te->desc, "FK CONSTRAINT") == 0)
625  strcpy(buffer, "DROP CONSTRAINT");
626  else
627  snprintf(buffer, sizeof(buffer), "DROP %s",
628  te->desc);
629 
630  mark = strstr(dropStmt, buffer);
631 
632  if (mark)
633  {
634  *mark = '\0';
635  appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
636  dropStmt, buffer,
637  mark + strlen(buffer));
638  }
639  else
640  {
641  /* complain and emit unmodified command */
642  pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
643  dropStmtOrig);
644  appendPQExpBufferStr(ftStmt, dropStmt);
645  }
646  }
647 
648  ahprintf(AH, "%s", ftStmt->data);
649 
650  destroyPQExpBuffer(ftStmt);
651  pg_free(dropStmtOrig);
652  }
653  }
654  }
655 
656  /*
657  * In --transaction-size mode, re-establish the transaction
658  * block if needed; otherwise, commit after every N drops.
659  */
660  if (ropt->txn_size > 0)
661  {
662  if (not_allowed_in_txn)
663  {
664  if (AH->connection)
665  StartTransaction(AHX);
666  else
667  ahprintf(AH, "BEGIN;\n");
668  AH->txnCount = 0;
669  }
670  else if (++AH->txnCount >= ropt->txn_size)
671  {
672  if (AH->connection)
673  {
674  CommitTransaction(AHX);
675  StartTransaction(AHX);
676  }
677  else
678  ahprintf(AH, "COMMIT;\nBEGIN;\n");
679  AH->txnCount = 0;
680  }
681  }
682  }
683  }
684 
685  /*
686  * _selectOutputSchema may have set currSchema to reflect the effect
687  * of a "SET search_path" command it emitted. However, by now we may
688  * have dropped that schema; or it might not have existed in the first
689  * place. In either case the effective value of search_path will not
690  * be what we think. Forcibly reset currSchema so that we will
691  * re-establish the search_path setting when needed (after creating
692  * the schema).
693  *
694  * If we treated users as pg_dump'able objects then we'd need to reset
695  * currUser here too.
696  */
697  free(AH->currSchema);
698  AH->currSchema = NULL;
699  }
700 
701  if (parallel_mode)
702  {
703  /*
704  * In parallel mode, turn control over to the parallel-restore logic.
705  */
706  ParallelState *pstate;
707  TocEntry pending_list;
708 
709  /* The archive format module may need some setup for this */
710  if (AH->PrepParallelRestorePtr)
711  AH->PrepParallelRestorePtr(AH);
712 
713  pending_list_header_init(&pending_list);
714 
715  /* This runs PRE_DATA items and then disconnects from the database */
716  restore_toc_entries_prefork(AH, &pending_list);
717  Assert(AH->connection == NULL);
718 
719  /* ParallelBackupStart() will actually fork the processes */
720  pstate = ParallelBackupStart(AH);
721  restore_toc_entries_parallel(AH, pstate, &pending_list);
722  ParallelBackupEnd(AH, pstate);
723 
724  /* reconnect the leader and see if we missed something */
725  restore_toc_entries_postfork(AH, &pending_list);
726  Assert(AH->connection != NULL);
727  }
728  else
729  {
730  /*
731  * In serial mode, process everything in three phases: normal items,
732  * then ACLs, then post-ACL items. We might be able to skip one or
733  * both extra phases in some cases, eg data-only restores.
734  */
735  bool haveACL = false;
736  bool havePostACL = false;
737 
738  for (te = AH->toc->next; te != AH->toc; te = te->next)
739  {
740  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
741  continue; /* ignore if not to be dumped at all */
742 
743  switch (_tocEntryRestorePass(te))
744  {
745  case RESTORE_PASS_MAIN:
746  (void) restore_toc_entry(AH, te, false);
747  break;
748  case RESTORE_PASS_ACL:
749  haveACL = true;
750  break;
752  havePostACL = true;
753  break;
754  }
755  }
756 
757  if (haveACL)
758  {
759  for (te = AH->toc->next; te != AH->toc; te = te->next)
760  {
761  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
763  (void) restore_toc_entry(AH, te, false);
764  }
765  }
766 
767  if (havePostACL)
768  {
769  for (te = AH->toc->next; te != AH->toc; te = te->next)
770  {
771  if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
773  (void) restore_toc_entry(AH, te, false);
774  }
775  }
776  }
777 
778  /*
779  * Close out any persistent transaction we may have. While these two
780  * cases are started in different places, we can end both cases here.
781  */
782  if (ropt->single_txn || ropt->txn_size > 0)
783  {
784  if (AH->connection)
785  CommitTransaction(AHX);
786  else
787  ahprintf(AH, "COMMIT;\n\n");
788  }
789 
790  if (AH->public.verbose)
791  dumpTimestamp(AH, "Completed on", time(NULL));
792 
793  ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
794 
795  /*
796  * Clean up & we're done.
797  */
798  AH->stage = STAGE_FINALIZING;
799 
801  RestoreOutput(AH, sav);
802 
803  if (ropt->useDB)
805 }
806 
807 /*
808  * Restore a single TOC item. Used in both parallel and non-parallel restore;
809  * is_parallel is true if we are in a worker child process.
810  *
811  * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
812  * the parallel parent has to make the corresponding status update.
813  */
814 static int
815 restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
816 {
817  RestoreOptions *ropt = AH->public.ropt;
818  int status = WORKER_OK;
819  int reqs;
820  bool defnDumped;
821 
822  AH->currentTE = te;
823 
824  /* Dump any relevant dump warnings to stderr */
825  if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
826  {
827  if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
828  pg_log_warning("warning from original dump file: %s", te->defn);
829  else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
830  pg_log_warning("warning from original dump file: %s", te->copyStmt);
831  }
832 
833  /* Work out what, if anything, we want from this entry */
834  reqs = te->reqs;
835 
836  defnDumped = false;
837 
838  /*
839  * If it has a schema component that we want, then process that
840  */
841  if ((reqs & REQ_SCHEMA) != 0)
842  {
843  bool object_is_db = false;
844 
845  /*
846  * In --transaction-size mode, must exit our transaction block to
847  * create a database or set its properties.
848  */
849  if (strcmp(te->desc, "DATABASE") == 0 ||
850  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
851  {
852  object_is_db = true;
853  if (ropt->txn_size > 0)
854  {
855  if (AH->connection)
857  else
858  ahprintf(AH, "COMMIT;\n\n");
859  }
860  }
861 
862  /* Show namespace in log message if available */
863  if (te->namespace)
864  pg_log_info("creating %s \"%s.%s\"",
865  te->desc, te->namespace, te->tag);
866  else
867  pg_log_info("creating %s \"%s\"",
868  te->desc, te->tag);
869 
870  _printTocEntry(AH, te, false);
871  defnDumped = true;
872 
873  if (strcmp(te->desc, "TABLE") == 0)
874  {
875  if (AH->lastErrorTE == te)
876  {
877  /*
878  * We failed to create the table. If
879  * --no-data-for-failed-tables was given, mark the
880  * corresponding TABLE DATA to be ignored.
881  *
882  * In the parallel case this must be done in the parent, so we
883  * just set the return value.
884  */
885  if (ropt->noDataForFailedTables)
886  {
887  if (is_parallel)
888  status = WORKER_INHIBIT_DATA;
889  else
891  }
892  }
893  else
894  {
895  /*
896  * We created the table successfully. Mark the corresponding
897  * TABLE DATA for possible truncation.
898  *
899  * In the parallel case this must be done in the parent, so we
900  * just set the return value.
901  */
902  if (is_parallel)
903  status = WORKER_CREATE_DONE;
904  else
905  mark_create_done(AH, te);
906  }
907  }
908 
909  /*
910  * If we created a DB, connect to it. Also, if we changed DB
911  * properties, reconnect to ensure that relevant GUC settings are
912  * applied to our session. (That also restarts the transaction block
913  * in --transaction-size mode.)
914  */
915  if (object_is_db)
916  {
917  pg_log_info("connecting to new database \"%s\"", te->tag);
918  _reconnectToDB(AH, te->tag);
919  }
920  }
921 
922  /*
923  * If it has a data component that we want, then process that
924  */
925  if ((reqs & REQ_DATA) != 0)
926  {
927  /*
928  * hadDumper will be set if there is genuine data component for this
929  * node. Otherwise, we need to check the defn field for statements
930  * that need to be executed in data-only restores.
931  */
932  if (te->hadDumper)
933  {
934  /*
935  * If we can output the data, then restore it.
936  */
937  if (AH->PrintTocDataPtr != NULL)
938  {
939  _printTocEntry(AH, te, true);
940 
941  if (strcmp(te->desc, "BLOBS") == 0 ||
942  strcmp(te->desc, "BLOB COMMENTS") == 0)
943  {
944  pg_log_info("processing %s", te->desc);
945 
946  _selectOutputSchema(AH, "pg_catalog");
947 
948  /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
949  if (strcmp(te->desc, "BLOB COMMENTS") == 0)
951 
952  AH->PrintTocDataPtr(AH, te);
953 
955  }
956  else
957  {
958  bool use_truncate;
959 
961 
962  /* Select owner and schema as necessary */
963  _becomeOwner(AH, te);
964  _selectOutputSchema(AH, te->namespace);
965 
966  pg_log_info("processing data for table \"%s.%s\"",
967  te->namespace, te->tag);
968 
969  /*
970  * In parallel restore, if we created the table earlier in
971  * this run (so that we know it is empty) and we are not
972  * restoring a load-via-partition-root data item then we
973  * wrap the COPY in a transaction and precede it with a
974  * TRUNCATE. If wal_level is set to minimal this prevents
975  * WAL-logging the COPY. This obtains a speedup similar
976  * to that from using single_txn mode in non-parallel
977  * restores.
978  *
979  * We mustn't do this for load-via-partition-root cases
980  * because some data might get moved across partition
981  * boundaries, risking deadlock and/or loss of previously
982  * loaded data. (We assume that all partitions of a
983  * partitioned table will be treated the same way.)
984  */
985  use_truncate = is_parallel && te->created &&
987 
988  if (use_truncate)
989  {
990  /*
991  * Parallel restore is always talking directly to a
992  * server, so no need to see if we should issue BEGIN.
993  */
994  StartTransaction(&AH->public);
995 
996  /*
997  * Issue TRUNCATE with ONLY so that child tables are
998  * not wiped.
999  */
1000  ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1001  fmtQualifiedId(te->namespace, te->tag));
1002  }
1003 
1004  /*
1005  * If we have a copy statement, use it.
1006  */
1007  if (te->copyStmt && strlen(te->copyStmt) > 0)
1008  {
1009  ahprintf(AH, "%s", te->copyStmt);
1011  }
1012  else
1014 
1015  AH->PrintTocDataPtr(AH, te);
1016 
1017  /*
1018  * Terminate COPY if needed.
1019  */
1020  if (AH->outputKind == OUTPUT_COPYDATA &&
1021  RestoringToDB(AH))
1022  EndDBCopyMode(&AH->public, te->tag);
1023  AH->outputKind = OUTPUT_SQLCMDS;
1024 
1025  /* close out the transaction started above */
1026  if (use_truncate)
1027  CommitTransaction(&AH->public);
1028 
1030  }
1031  }
1032  }
1033  else if (!defnDumped)
1034  {
1035  /* If we haven't already dumped the defn part, do so now */
1036  pg_log_info("executing %s %s", te->desc, te->tag);
1037  _printTocEntry(AH, te, false);
1038  }
1039  }
1040 
1041  /*
1042  * If we emitted anything for this TOC entry, that counts as one action
1043  * against the transaction-size limit. Commit if it's time to.
1044  */
1045  if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
1046  {
1047  if (++AH->txnCount >= ropt->txn_size)
1048  {
1049  if (AH->connection)
1050  {
1051  CommitTransaction(&AH->public);
1052  StartTransaction(&AH->public);
1053  }
1054  else
1055  ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1056  AH->txnCount = 0;
1057  }
1058  }
1059 
1060  if (AH->public.n_errors > 0 && status == WORKER_OK)
1061  status = WORKER_IGNORED_ERRORS;
1062 
1063  return status;
1064 }
1065 
1066 /*
1067  * Allocate a new RestoreOptions block.
1068  * This is mainly so we can initialize it, but also for future expansion,
1069  */
1072 {
1074 
1076 
1077  /* set any fields that shouldn't default to zeroes */
1078  opts->format = archUnknown;
1079  opts->cparams.promptPassword = TRI_DEFAULT;
1080  opts->dumpSections = DUMP_UNSECTIONED;
1081  opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1082  opts->compression_spec.level = 0;
1083 
1084  return opts;
1085 }
1086 
1087 static void
1089 {
1090  RestoreOptions *ropt = AH->public.ropt;
1091 
1092  /* This hack is only needed in a data-only restore */
1093  if (!ropt->dataOnly || !ropt->disable_triggers)
1094  return;
1095 
1096  pg_log_info("disabling triggers for %s", te->tag);
1097 
1098  /*
1099  * Become superuser if possible, since they are the only ones who can
1100  * disable constraint triggers. If -S was not given, assume the initial
1101  * user identity is a superuser. (XXX would it be better to become the
1102  * table owner?)
1103  */
1104  _becomeUser(AH, ropt->superuser);
1105 
1106  /*
1107  * Disable them.
1108  */
1109  ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1110  fmtQualifiedId(te->namespace, te->tag));
1111 }
1112 
1113 static void
1115 {
1116  RestoreOptions *ropt = AH->public.ropt;
1117 
1118  /* This hack is only needed in a data-only restore */
1119  if (!ropt->dataOnly || !ropt->disable_triggers)
1120  return;
1121 
1122  pg_log_info("enabling triggers for %s", te->tag);
1123 
1124  /*
1125  * Become superuser if possible, since they are the only ones who can
1126  * disable constraint triggers. If -S was not given, assume the initial
1127  * user identity is a superuser. (XXX would it be better to become the
1128  * table owner?)
1129  */
1130  _becomeUser(AH, ropt->superuser);
1131 
1132  /*
1133  * Enable them.
1134  */
1135  ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1136  fmtQualifiedId(te->namespace, te->tag));
1137 }
1138 
1139 /*
1140  * Detect whether a TABLE DATA TOC item is performing "load via partition
1141  * root", that is the target table is an ancestor partition rather than the
1142  * table the TOC item is nominally for.
1143  *
1144  * In newer archive files this can be detected by checking for a special
1145  * comment placed in te->defn. In older files we have to fall back to seeing
1146  * if the COPY statement targets the named table or some other one. This
1147  * will not work for data dumped as INSERT commands, so we could give a false
1148  * negative in that case; fortunately, that's a rarely-used option.
1149  */
1150 static bool
1152 {
1153  if (te->defn &&
1154  strncmp(te->defn, "-- load via partition root ", 27) == 0)
1155  return true;
1156  if (te->copyStmt && *te->copyStmt)
1157  {
1158  PQExpBuffer copyStmt = createPQExpBuffer();
1159  bool result;
1160 
1161  /*
1162  * Build the initial part of the COPY as it would appear if the
1163  * nominal target table is the actual target. If we see anything
1164  * else, it must be a load-via-partition-root case.
1165  */
1166  appendPQExpBuffer(copyStmt, "COPY %s ",
1167  fmtQualifiedId(te->namespace, te->tag));
1168  result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1169  destroyPQExpBuffer(copyStmt);
1170  return result;
1171  }
1172  /* Assume it's not load-via-partition-root */
1173  return false;
1174 }
1175 
1176 /*
1177  * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1178  */
1179 
1180 /* Public */
1181 void
1182 WriteData(Archive *AHX, const void *data, size_t dLen)
1183 {
1184  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1185 
1186  if (!AH->currToc)
1187  pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1188 
1189  AH->WriteDataPtr(AH, data, dLen);
1190 }
1191 
1192 /*
1193  * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1194  * repository for all metadata. But the name has stuck.
1195  *
1196  * The new entry is added to the Archive's TOC list. Most callers can ignore
1197  * the result value because nothing else need be done, but a few want to
1198  * manipulate the TOC entry further.
1199  */
1200 
1201 /* Public */
1202 TocEntry *
1203 ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1204  ArchiveOpts *opts)
1205 {
1206  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207  TocEntry *newToc;
1208 
1209  newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1210 
1211  AH->tocCount++;
1212  if (dumpId > AH->maxDumpId)
1213  AH->maxDumpId = dumpId;
1214 
1215  newToc->prev = AH->toc->prev;
1216  newToc->next = AH->toc;
1217  AH->toc->prev->next = newToc;
1218  AH->toc->prev = newToc;
1219 
1220  newToc->catalogId = catalogId;
1221  newToc->dumpId = dumpId;
1222  newToc->section = opts->section;
1223 
1224  newToc->tag = pg_strdup(opts->tag);
1225  newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1226  newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1227  newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1228  newToc->relkind = opts->relkind;
1229  newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1230  newToc->desc = pg_strdup(opts->description);
1231  newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1232  newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1233  newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1234 
1235  if (opts->nDeps > 0)
1236  {
1237  newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1238  memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1239  newToc->nDeps = opts->nDeps;
1240  }
1241  else
1242  {
1243  newToc->dependencies = NULL;
1244  newToc->nDeps = 0;
1245  }
1246 
1247  newToc->dataDumper = opts->dumpFn;
1248  newToc->dataDumperArg = opts->dumpArg;
1249  newToc->hadDumper = opts->dumpFn ? true : false;
1250 
1251  newToc->formatData = NULL;
1252  newToc->dataLength = 0;
1253 
1254  if (AH->ArchiveEntryPtr != NULL)
1255  AH->ArchiveEntryPtr(AH, newToc);
1256 
1257  return newToc;
1258 }
1259 
1260 /* Public */
1261 void
1263 {
1264  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1265  RestoreOptions *ropt = AH->public.ropt;
1266  TocEntry *te;
1267  pg_compress_specification out_compression_spec = {0};
1268  teSection curSection;
1269  CompressFileHandle *sav;
1270  const char *fmtName;
1271  char stamp_str[64];
1272 
1273  /* TOC is always uncompressed */
1274  out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1275 
1276  sav = SaveOutput(AH);
1277  if (ropt->filename)
1278  SetOutput(AH, ropt->filename, out_compression_spec);
1279 
1280  if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1281  localtime(&AH->createDate)) == 0)
1282  strcpy(stamp_str, "[unknown]");
1283 
1284  ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1285  ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1286  sanitize_line(AH->archdbname, false),
1287  AH->tocCount,
1289 
1290  switch (AH->format)
1291  {
1292  case archCustom:
1293  fmtName = "CUSTOM";
1294  break;
1295  case archDirectory:
1296  fmtName = "DIRECTORY";
1297  break;
1298  case archTar:
1299  fmtName = "TAR";
1300  break;
1301  default:
1302  fmtName = "UNKNOWN";
1303  }
1304 
1305  ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1307  ahprintf(AH, "; Format: %s\n", fmtName);
1308  ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1309  ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1310  if (AH->archiveRemoteVersion)
1311  ahprintf(AH, "; Dumped from database version: %s\n",
1312  AH->archiveRemoteVersion);
1313  if (AH->archiveDumpVersion)
1314  ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1315  AH->archiveDumpVersion);
1316 
1317  ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1318 
1319  curSection = SECTION_PRE_DATA;
1320  for (te = AH->toc->next; te != AH->toc; te = te->next)
1321  {
1322  if (te->section != SECTION_NONE)
1323  curSection = te->section;
1324  if (ropt->verbose ||
1325  (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1326  {
1327  char *sanitized_name;
1328  char *sanitized_schema;
1329  char *sanitized_owner;
1330 
1331  /*
1332  */
1333  sanitized_name = sanitize_line(te->tag, false);
1334  sanitized_schema = sanitize_line(te->namespace, true);
1335  sanitized_owner = sanitize_line(te->owner, false);
1336 
1337  ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1338  te->catalogId.tableoid, te->catalogId.oid,
1339  te->desc, sanitized_schema, sanitized_name,
1340  sanitized_owner);
1341 
1342  free(sanitized_name);
1343  free(sanitized_schema);
1344  free(sanitized_owner);
1345  }
1346  if (ropt->verbose && te->nDeps > 0)
1347  {
1348  int i;
1349 
1350  ahprintf(AH, ";\tdepends on:");
1351  for (i = 0; i < te->nDeps; i++)
1352  ahprintf(AH, " %d", te->dependencies[i]);
1353  ahprintf(AH, "\n");
1354  }
1355  }
1356 
1357  /* Enforce strict names checking */
1358  if (ropt->strict_names)
1359  StrictNamesCheck(ropt);
1360 
1361  if (ropt->filename)
1362  RestoreOutput(AH, sav);
1363 }
1364 
1365 /***********
1366  * Large Object Archival
1367  ***********/
1368 
1369 /* Called by a dumper to signal start of a LO */
1370 int
1371 StartLO(Archive *AHX, Oid oid)
1372 {
1373  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1374 
1375  if (!AH->StartLOPtr)
1376  pg_fatal("large-object output not supported in chosen format");
1377 
1378  AH->StartLOPtr(AH, AH->currToc, oid);
1379 
1380  return 1;
1381 }
1382 
1383 /* Called by a dumper to signal end of a LO */
1384 int
1385 EndLO(Archive *AHX, Oid oid)
1386 {
1387  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1388 
1389  if (AH->EndLOPtr)
1390  AH->EndLOPtr(AH, AH->currToc, oid);
1391 
1392  return 1;
1393 }
1394 
1395 /**********
1396  * Large Object Restoration
1397  **********/
1398 
1399 /*
1400  * Called by a format handler before a group of LOs is restored
1401  */
1402 void
1404 {
1405  RestoreOptions *ropt = AH->public.ropt;
1406 
1407  /*
1408  * LOs must be restored within a transaction block, since we need the LO
1409  * handle to stay open while we write it. Establish a transaction unless
1410  * there's one being used globally.
1411  */
1412  if (!(ropt->single_txn || ropt->txn_size > 0))
1413  {
1414  if (AH->connection)
1415  StartTransaction(&AH->public);
1416  else
1417  ahprintf(AH, "BEGIN;\n\n");
1418  }
1419 
1420  AH->loCount = 0;
1421 }
1422 
1423 /*
1424  * Called by a format handler after a group of LOs is restored
1425  */
1426 void
1428 {
1429  RestoreOptions *ropt = AH->public.ropt;
1430 
1431  if (!(ropt->single_txn || ropt->txn_size > 0))
1432  {
1433  if (AH->connection)
1434  CommitTransaction(&AH->public);
1435  else
1436  ahprintf(AH, "COMMIT;\n\n");
1437  }
1438 
1439  pg_log_info(ngettext("restored %d large object",
1440  "restored %d large objects",
1441  AH->loCount),
1442  AH->loCount);
1443 }
1444 
1445 
1446 /*
1447  * Called by a format handler to initiate restoration of a LO
1448  */
1449 void
1450 StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1451 {
1452  bool old_lo_style = (AH->version < K_VERS_1_12);
1453  Oid loOid;
1454 
1455  AH->loCount++;
1456 
1457  /* Initialize the LO Buffer */
1458  if (AH->lo_buf == NULL)
1459  {
1460  /* First time through (in this process) so allocate the buffer */
1461  AH->lo_buf_size = LOBBUFSIZE;
1462  AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
1463  }
1464  AH->lo_buf_used = 0;
1465 
1466  pg_log_info("restoring large object with OID %u", oid);
1467 
1468  /* With an old archive we must do drop and create logic here */
1469  if (old_lo_style && drop)
1470  DropLOIfExists(AH, oid);
1471 
1472  if (AH->connection)
1473  {
1474  if (old_lo_style)
1475  {
1476  loOid = lo_create(AH->connection, oid);
1477  if (loOid == 0 || loOid != oid)
1478  pg_fatal("could not create large object %u: %s",
1479  oid, PQerrorMessage(AH->connection));
1480  }
1481  AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1482  if (AH->loFd == -1)
1483  pg_fatal("could not open large object %u: %s",
1484  oid, PQerrorMessage(AH->connection));
1485  }
1486  else
1487  {
1488  if (old_lo_style)
1489  ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1490  oid, INV_WRITE);
1491  else
1492  ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1493  oid, INV_WRITE);
1494  }
1495 
1496  AH->writingLO = true;
1497 }
1498 
1499 void
1501 {
1502  if (AH->lo_buf_used > 0)
1503  {
1504  /* Write remaining bytes from the LO buffer */
1505  dump_lo_buf(AH);
1506  }
1507 
1508  AH->writingLO = false;
1509 
1510  if (AH->connection)
1511  {
1512  lo_close(AH->connection, AH->loFd);
1513  AH->loFd = -1;
1514  }
1515  else
1516  {
1517  ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1518  }
1519 }
1520 
1521 /***********
1522  * Sorting and Reordering
1523  ***********/
1524 
1525 void
1527 {
1528  ArchiveHandle *AH = (ArchiveHandle *) AHX;
1529  RestoreOptions *ropt = AH->public.ropt;
1530  FILE *fh;
1531  StringInfoData linebuf;
1532 
1533  /* Allocate space for the 'wanted' array, and init it */
1534  ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1535 
1536  /* Setup the file */
1537  fh = fopen(ropt->tocFile, PG_BINARY_R);
1538  if (!fh)
1539  pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1540 
1541  initStringInfo(&linebuf);
1542 
1543  while (pg_get_line_buf(fh, &linebuf))
1544  {
1545  char *cmnt;
1546  char *endptr;
1547  DumpId id;
1548  TocEntry *te;
1549 
1550  /* Truncate line at comment, if any */
1551  cmnt = strchr(linebuf.data, ';');
1552  if (cmnt != NULL)
1553  {
1554  cmnt[0] = '\0';
1555  linebuf.len = cmnt - linebuf.data;
1556  }
1557 
1558  /* Ignore if all blank */
1559  if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1560  continue;
1561 
1562  /* Get an ID, check it's valid and not already seen */
1563  id = strtol(linebuf.data, &endptr, 10);
1564  if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1565  ropt->idWanted[id - 1])
1566  {
1567  pg_log_warning("line ignored: %s", linebuf.data);
1568  continue;
1569  }
1570 
1571  /* Find TOC entry */
1572  te = getTocEntryByDumpId(AH, id);
1573  if (!te)
1574  pg_fatal("could not find entry for ID %d",
1575  id);
1576 
1577  /* Mark it wanted */
1578  ropt->idWanted[id - 1] = true;
1579 
1580  /*
1581  * Move each item to the end of the list as it is selected, so that
1582  * they are placed in the desired order. Any unwanted items will end
1583  * up at the front of the list, which may seem unintuitive but it's
1584  * what we need. In an ordinary serial restore that makes no
1585  * difference, but in a parallel restore we need to mark unrestored
1586  * items' dependencies as satisfied before we start examining
1587  * restorable items. Otherwise they could have surprising
1588  * side-effects on the order in which restorable items actually get
1589  * restored.
1590  */
1591  _moveBefore(AH->toc, te);
1592  }
1593 
1594  pg_free(linebuf.data);
1595 
1596  if (fclose(fh) != 0)
1597  pg_fatal("could not close TOC file: %m");
1598 }
1599 
1600 /**********************
1601  * Convenience functions that look like standard IO functions
1602  * for writing data when in dump mode.
1603  **********************/
1604 
1605 /* Public */
1606 void
1607 archputs(const char *s, Archive *AH)
1608 {
1609  WriteData(AH, s, strlen(s));
1610 }
1611 
1612 /* Public */
1613 int
1614 archprintf(Archive *AH, const char *fmt,...)
1615 {
1616  int save_errno = errno;
1617  char *p;
1618  size_t len = 128; /* initial assumption about buffer size */
1619  size_t cnt;
1620 
1621  for (;;)
1622  {
1623  va_list args;
1624 
1625  /* Allocate work buffer. */
1626  p = (char *) pg_malloc(len);
1627 
1628  /* Try to format the data. */
1629  errno = save_errno;
1630  va_start(args, fmt);
1631  cnt = pvsnprintf(p, len, fmt, args);
1632  va_end(args);
1633 
1634  if (cnt < len)
1635  break; /* success */
1636 
1637  /* Release buffer and loop around to try again with larger len. */
1638  free(p);
1639  len = cnt;
1640  }
1641 
1642  WriteData(AH, p, cnt);
1643  free(p);
1644  return (int) cnt;
1645 }
1646 
1647 
1648 /*******************************
1649  * Stuff below here should be 'private' to the archiver routines
1650  *******************************/
1651 
1652 static void
1654  const pg_compress_specification compression_spec)
1655 {
1656  CompressFileHandle *CFH;
1657  const char *mode;
1658  int fn = -1;
1659 
1660  if (filename)
1661  {
1662  if (strcmp(filename, "-") == 0)
1663  fn = fileno(stdout);
1664  }
1665  else if (AH->FH)
1666  fn = fileno(AH->FH);
1667  else if (AH->fSpec)
1668  {
1669  filename = AH->fSpec;
1670  }
1671  else
1672  fn = fileno(stdout);
1673 
1674  if (AH->mode == archModeAppend)
1675  mode = PG_BINARY_A;
1676  else
1677  mode = PG_BINARY_W;
1678 
1679  CFH = InitCompressFileHandle(compression_spec);
1680 
1681  if (!CFH->open_func(filename, fn, mode, CFH))
1682  {
1683  if (filename)
1684  pg_fatal("could not open output file \"%s\": %m", filename);
1685  else
1686  pg_fatal("could not open output file: %m");
1687  }
1688 
1689  AH->OF = CFH;
1690 }
1691 
1692 static CompressFileHandle *
1694 {
1695  return (CompressFileHandle *) AH->OF;
1696 }
1697 
1698 static void
1700 {
1701  errno = 0;
1702  if (!EndCompressFileHandle(AH->OF))
1703  pg_fatal("could not close output file: %m");
1704 
1705  AH->OF = savedOutput;
1706 }
1707 
1708 
1709 
1710 /*
1711  * Print formatted text to the output file (usually stdout).
1712  */
1713 int
1714 ahprintf(ArchiveHandle *AH, const char *fmt,...)
1715 {
1716  int save_errno = errno;
1717  char *p;
1718  size_t len = 128; /* initial assumption about buffer size */
1719  size_t cnt;
1720 
1721  for (;;)
1722  {
1723  va_list args;
1724 
1725  /* Allocate work buffer. */
1726  p = (char *) pg_malloc(len);
1727 
1728  /* Try to format the data. */
1729  errno = save_errno;
1730  va_start(args, fmt);
1731  cnt = pvsnprintf(p, len, fmt, args);
1732  va_end(args);
1733 
1734  if (cnt < len)
1735  break; /* success */
1736 
1737  /* Release buffer and loop around to try again with larger len. */
1738  free(p);
1739  len = cnt;
1740  }
1741 
1742  ahwrite(p, 1, cnt, AH);
1743  free(p);
1744  return (int) cnt;
1745 }
1746 
1747 /*
1748  * Single place for logic which says 'We are restoring to a direct DB connection'.
1749  */
1750 static int
1752 {
1753  RestoreOptions *ropt = AH->public.ropt;
1754 
1755  return (ropt && ropt->useDB && AH->connection);
1756 }
1757 
1758 /*
1759  * Dump the current contents of the LO data buffer while writing a LO
1760  */
1761 static void
1763 {
1764  if (AH->connection)
1765  {
1766  int res;
1767 
1768  res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1769  pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1770  "wrote %zu bytes of large object data (result = %d)",
1771  AH->lo_buf_used),
1772  AH->lo_buf_used, res);
1773  /* We assume there are no short writes, only errors */
1774  if (res != AH->lo_buf_used)
1775  warn_or_exit_horribly(AH, "could not write to large object: %s",
1776  PQerrorMessage(AH->connection));
1777  }
1778  else
1779  {
1781 
1783  (const unsigned char *) AH->lo_buf,
1784  AH->lo_buf_used,
1785  AH);
1786 
1787  /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1788  AH->writingLO = false;
1789  ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1790  AH->writingLO = true;
1791 
1793  }
1794  AH->lo_buf_used = 0;
1795 }
1796 
1797 
1798 /*
1799  * Write buffer to the output file (usually stdout). This is used for
1800  * outputting 'restore' scripts etc. It is even possible for an archive
1801  * format to create a custom output routine to 'fake' a restore if it
1802  * wants to generate a script (see TAR output).
1803  */
1804 void
1805 ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1806 {
1807  int bytes_written = 0;
1808 
1809  if (AH->writingLO)
1810  {
1811  size_t remaining = size * nmemb;
1812 
1813  while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1814  {
1815  size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1816 
1817  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1818  ptr = (const void *) ((const char *) ptr + avail);
1819  remaining -= avail;
1820  AH->lo_buf_used += avail;
1821  dump_lo_buf(AH);
1822  }
1823 
1824  memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1825  AH->lo_buf_used += remaining;
1826 
1827  bytes_written = size * nmemb;
1828  }
1829  else if (AH->CustomOutPtr)
1830  bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1831 
1832  /*
1833  * If we're doing a restore, and it's direct to DB, and we're connected
1834  * then send it to the DB.
1835  */
1836  else if (RestoringToDB(AH))
1837  bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1838  else
1839  {
1840  CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1841 
1842  if (CFH->write_func(ptr, size * nmemb, CFH))
1843  bytes_written = size * nmemb;
1844  }
1845 
1846  if (bytes_written != size * nmemb)
1848 }
1849 
1850 /* on some error, we may decide to go on... */
1851 void
1853 {
1854  va_list ap;
1855 
1856  switch (AH->stage)
1857  {
1858 
1859  case STAGE_NONE:
1860  /* Do nothing special */
1861  break;
1862 
1863  case STAGE_INITIALIZING:
1864  if (AH->stage != AH->lastErrorStage)
1865  pg_log_info("while INITIALIZING:");
1866  break;
1867 
1868  case STAGE_PROCESSING:
1869  if (AH->stage != AH->lastErrorStage)
1870  pg_log_info("while PROCESSING TOC:");
1871  break;
1872 
1873  case STAGE_FINALIZING:
1874  if (AH->stage != AH->lastErrorStage)
1875  pg_log_info("while FINALIZING:");
1876  break;
1877  }
1878  if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1879  {
1880  pg_log_info("from TOC entry %d; %u %u %s %s %s",
1881  AH->currentTE->dumpId,
1883  AH->currentTE->catalogId.oid,
1884  AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1885  AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1886  AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1887  }
1888  AH->lastErrorStage = AH->stage;
1889  AH->lastErrorTE = AH->currentTE;
1890 
1891  va_start(ap, fmt);
1893  va_end(ap);
1894 
1895  if (AH->public.exit_on_error)
1896  exit_nicely(1);
1897  else
1898  AH->public.n_errors++;
1899 }
1900 
1901 #ifdef NOT_USED
1902 
1903 static void
1904 _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1905 {
1906  /* Unlink te from list */
1907  te->prev->next = te->next;
1908  te->next->prev = te->prev;
1909 
1910  /* and insert it after "pos" */
1911  te->prev = pos;
1912  te->next = pos->next;
1913  pos->next->prev = te;
1914  pos->next = te;
1915 }
1916 #endif
1917 
1918 static void
1920 {
1921  /* Unlink te from list */
1922  te->prev->next = te->next;
1923  te->next->prev = te->prev;
1924 
1925  /* and insert it before "pos" */
1926  te->prev = pos->prev;
1927  te->next = pos;
1928  pos->prev->next = te;
1929  pos->prev = te;
1930 }
1931 
1932 /*
1933  * Build index arrays for the TOC list
1934  *
1935  * This should be invoked only after we have created or read in all the TOC
1936  * items.
1937  *
1938  * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1939  * array entries run only up to maxDumpId. We might see dependency dump IDs
1940  * beyond that (if the dump was partial); so always check the array bound
1941  * before trying to touch an array entry.
1942  */
1943 static void
1945 {
1946  DumpId maxDumpId = AH->maxDumpId;
1947  TocEntry *te;
1948 
1949  AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1950  AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1951 
1952  for (te = AH->toc->next; te != AH->toc; te = te->next)
1953  {
1954  /* this check is purely paranoia, maxDumpId should be correct */
1955  if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1956  pg_fatal("bad dumpId");
1957 
1958  /* tocsByDumpId indexes all TOCs by their dump ID */
1959  AH->tocsByDumpId[te->dumpId] = te;
1960 
1961  /*
1962  * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1963  * TOC entry that has a DATA item. We compute this by reversing the
1964  * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1965  * just one dependency and it is the TABLE item.
1966  */
1967  if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1968  {
1969  DumpId tableId = te->dependencies[0];
1970 
1971  /*
1972  * The TABLE item might not have been in the archive, if this was
1973  * a data-only dump; but its dump ID should be less than its data
1974  * item's dump ID, so there should be a place for it in the array.
1975  */
1976  if (tableId <= 0 || tableId > maxDumpId)
1977  pg_fatal("bad table dumpId for TABLE DATA item");
1978 
1979  AH->tableDataId[tableId] = te->dumpId;
1980  }
1981  }
1982 }
1983 
1984 TocEntry *
1986 {
1987  /* build index arrays if we didn't already */
1988  if (AH->tocsByDumpId == NULL)
1989  buildTocEntryArrays(AH);
1990 
1991  if (id > 0 && id <= AH->maxDumpId)
1992  return AH->tocsByDumpId[id];
1993 
1994  return NULL;
1995 }
1996 
1997 int
1999 {
2000  TocEntry *te = getTocEntryByDumpId(AH, id);
2001 
2002  if (!te)
2003  return 0;
2004 
2005  return te->reqs;
2006 }
2007 
2008 size_t
2009 WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2010 {
2011  int off;
2012 
2013  /* Save the flag */
2014  AH->WriteBytePtr(AH, wasSet);
2015 
2016  /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2017  for (off = 0; off < sizeof(pgoff_t); off++)
2018  {
2019  AH->WriteBytePtr(AH, o & 0xFF);
2020  o >>= 8;
2021  }
2022  return sizeof(pgoff_t) + 1;
2023 }
2024 
2025 int
2027 {
2028  int i;
2029  int off;
2030  int offsetFlg;
2031 
2032  /* Initialize to zero */
2033  *o = 0;
2034 
2035  /* Check for old version */
2036  if (AH->version < K_VERS_1_7)
2037  {
2038  /* Prior versions wrote offsets using WriteInt */
2039  i = ReadInt(AH);
2040  /* -1 means not set */
2041  if (i < 0)
2042  return K_OFFSET_POS_NOT_SET;
2043  else if (i == 0)
2044  return K_OFFSET_NO_DATA;
2045 
2046  /* Cast to pgoff_t because it was written as an int. */
2047  *o = (pgoff_t) i;
2048  return K_OFFSET_POS_SET;
2049  }
2050 
2051  /*
2052  * Read the flag indicating the state of the data pointer. Check if valid
2053  * and die if not.
2054  *
2055  * This used to be handled by a negative or zero pointer, now we use an
2056  * extra byte specifically for the state.
2057  */
2058  offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2059 
2060  switch (offsetFlg)
2061  {
2062  case K_OFFSET_POS_NOT_SET:
2063  case K_OFFSET_NO_DATA:
2064  case K_OFFSET_POS_SET:
2065 
2066  break;
2067 
2068  default:
2069  pg_fatal("unexpected data offset flag %d", offsetFlg);
2070  }
2071 
2072  /*
2073  * Read the bytes
2074  */
2075  for (off = 0; off < AH->offSize; off++)
2076  {
2077  if (off < sizeof(pgoff_t))
2078  *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2079  else
2080  {
2081  if (AH->ReadBytePtr(AH) != 0)
2082  pg_fatal("file offset in dump file is too large");
2083  }
2084  }
2085 
2086  return offsetFlg;
2087 }
2088 
2089 size_t
2091 {
2092  int b;
2093 
2094  /*
2095  * This is a bit yucky, but I don't want to make the binary format very
2096  * dependent on representation, and not knowing much about it, I write out
2097  * a sign byte. If you change this, don't forget to change the file
2098  * version #, and modify ReadInt to read the new format AS WELL AS the old
2099  * formats.
2100  */
2101 
2102  /* SIGN byte */
2103  if (i < 0)
2104  {
2105  AH->WriteBytePtr(AH, 1);
2106  i = -i;
2107  }
2108  else
2109  AH->WriteBytePtr(AH, 0);
2110 
2111  for (b = 0; b < AH->intSize; b++)
2112  {
2113  AH->WriteBytePtr(AH, i & 0xFF);
2114  i >>= 8;
2115  }
2116 
2117  return AH->intSize + 1;
2118 }
2119 
2120 int
2122 {
2123  int res = 0;
2124  int bv,
2125  b;
2126  int sign = 0; /* Default positive */
2127  int bitShift = 0;
2128 
2129  if (AH->version > K_VERS_1_0)
2130  /* Read a sign byte */
2131  sign = AH->ReadBytePtr(AH);
2132 
2133  for (b = 0; b < AH->intSize; b++)
2134  {
2135  bv = AH->ReadBytePtr(AH) & 0xFF;
2136  if (bv != 0)
2137  res = res + (bv << bitShift);
2138  bitShift += 8;
2139  }
2140 
2141  if (sign)
2142  res = -res;
2143 
2144  return res;
2145 }
2146 
2147 size_t
2148 WriteStr(ArchiveHandle *AH, const char *c)
2149 {
2150  size_t res;
2151 
2152  if (c)
2153  {
2154  int len = strlen(c);
2155 
2156  res = WriteInt(AH, len);
2157  AH->WriteBufPtr(AH, c, len);
2158  res += len;
2159  }
2160  else
2161  res = WriteInt(AH, -1);
2162 
2163  return res;
2164 }
2165 
2166 char *
2168 {
2169  char *buf;
2170  int l;
2171 
2172  l = ReadInt(AH);
2173  if (l < 0)
2174  buf = NULL;
2175  else
2176  {
2177  buf = (char *) pg_malloc(l + 1);
2178  AH->ReadBufPtr(AH, (void *) buf, l);
2179 
2180  buf[l] = '\0';
2181  }
2182 
2183  return buf;
2184 }
2185 
2186 static bool
2187 _fileExistsInDirectory(const char *dir, const char *filename)
2188 {
2189  struct stat st;
2190  char buf[MAXPGPATH];
2191 
2192  if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2193  pg_fatal("directory name too long: \"%s\"", dir);
2194 
2195  return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2196 }
2197 
2198 static int
2200 {
2201  FILE *fh;
2202  char sig[6]; /* More than enough */
2203  size_t cnt;
2204  int wantClose = 0;
2205 
2206  pg_log_debug("attempting to ascertain archive format");
2207 
2208  free(AH->lookahead);
2209 
2210  AH->readHeader = 0;
2211  AH->lookaheadSize = 512;
2212  AH->lookahead = pg_malloc0(512);
2213  AH->lookaheadLen = 0;
2214  AH->lookaheadPos = 0;
2215 
2216  if (AH->fSpec)
2217  {
2218  struct stat st;
2219 
2220  wantClose = 1;
2221 
2222  /*
2223  * Check if the specified archive is a directory. If so, check if
2224  * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2225  */
2226  if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2227  {
2228  AH->format = archDirectory;
2229  if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2230  return AH->format;
2231 #ifdef HAVE_LIBZ
2232  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2233  return AH->format;
2234 #endif
2235 #ifdef USE_LZ4
2236  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2237  return AH->format;
2238 #endif
2239 #ifdef USE_ZSTD
2240  if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2241  return AH->format;
2242 #endif
2243  pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2244  AH->fSpec);
2245  fh = NULL; /* keep compiler quiet */
2246  }
2247  else
2248  {
2249  fh = fopen(AH->fSpec, PG_BINARY_R);
2250  if (!fh)
2251  pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2252  }
2253  }
2254  else
2255  {
2256  fh = stdin;
2257  if (!fh)
2258  pg_fatal("could not open input file: %m");
2259  }
2260 
2261  if ((cnt = fread(sig, 1, 5, fh)) != 5)
2262  {
2263  if (ferror(fh))
2264  pg_fatal("could not read input file: %m");
2265  else
2266  pg_fatal("input file is too short (read %lu, expected 5)",
2267  (unsigned long) cnt);
2268  }
2269 
2270  /* Save it, just in case we need it later */
2271  memcpy(&AH->lookahead[0], sig, 5);
2272  AH->lookaheadLen = 5;
2273 
2274  if (strncmp(sig, "PGDMP", 5) == 0)
2275  {
2276  /* It's custom format, stop here */
2277  AH->format = archCustom;
2278  AH->readHeader = 1;
2279  }
2280  else
2281  {
2282  /*
2283  * *Maybe* we have a tar archive format file or a text dump ... So,
2284  * read first 512 byte header...
2285  */
2286  cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2287  /* read failure is checked below */
2288  AH->lookaheadLen += cnt;
2289 
2290  if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2291  (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2292  strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2293  {
2294  /*
2295  * looks like it's probably a text format dump. so suggest they
2296  * try psql
2297  */
2298  pg_fatal("input file appears to be a text format dump. Please use psql.");
2299  }
2300 
2301  if (AH->lookaheadLen != 512)
2302  {
2303  if (feof(fh))
2304  pg_fatal("input file does not appear to be a valid archive (too short?)");
2305  else
2306  READ_ERROR_EXIT(fh);
2307  }
2308 
2309  if (!isValidTarHeader(AH->lookahead))
2310  pg_fatal("input file does not appear to be a valid archive");
2311 
2312  AH->format = archTar;
2313  }
2314 
2315  /* Close the file if we opened it */
2316  if (wantClose)
2317  {
2318  if (fclose(fh) != 0)
2319  pg_fatal("could not close input file: %m");
2320  /* Forget lookahead, since we'll re-read header after re-opening */
2321  AH->readHeader = 0;
2322  AH->lookaheadLen = 0;
2323  }
2324 
2325  return AH->format;
2326 }
2327 
2328 
2329 /*
2330  * Allocate an archive handle
2331  */
2332 static ArchiveHandle *
2333 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2334  const pg_compress_specification compression_spec,
2335  bool dosync, ArchiveMode mode,
2337 {
2338  ArchiveHandle *AH;
2339  CompressFileHandle *CFH;
2340  pg_compress_specification out_compress_spec = {0};
2341 
2342  pg_log_debug("allocating AH for %s, format %d",
2343  FileSpec ? FileSpec : "(stdio)", fmt);
2344 
2345  AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2346 
2347  AH->version = K_VERS_SELF;
2348 
2349  /* initialize for backwards compatible string processing */
2350  AH->public.encoding = 0; /* PG_SQL_ASCII */
2351  AH->public.std_strings = false;
2352 
2353  /* sql error handling */
2354  AH->public.exit_on_error = true;
2355  AH->public.n_errors = 0;
2356 
2357  AH->archiveDumpVersion = PG_VERSION;
2358 
2359  AH->createDate = time(NULL);
2360 
2361  AH->intSize = sizeof(int);
2362  AH->offSize = sizeof(pgoff_t);
2363  if (FileSpec)
2364  {
2365  AH->fSpec = pg_strdup(FileSpec);
2366 
2367  /*
2368  * Not used; maybe later....
2369  *
2370  * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2371  * i--) if (AH->workDir[i-1] == '/')
2372  */
2373  }
2374  else
2375  AH->fSpec = NULL;
2376 
2377  AH->currUser = NULL; /* unknown */
2378  AH->currSchema = NULL; /* ditto */
2379  AH->currTablespace = NULL; /* ditto */
2380  AH->currTableAm = NULL; /* ditto */
2381 
2382  AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2383 
2384  AH->toc->next = AH->toc;
2385  AH->toc->prev = AH->toc;
2386 
2387  AH->mode = mode;
2388  AH->compression_spec = compression_spec;
2389  AH->dosync = dosync;
2390  AH->sync_method = sync_method;
2391 
2392  memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2393 
2394  /* Open stdout with no compression for AH output handle */
2395  out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2396  CFH = InitCompressFileHandle(out_compress_spec);
2397  if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2398  pg_fatal("could not open stdout for appending: %m");
2399  AH->OF = CFH;
2400 
2401  /*
2402  * On Windows, we need to use binary mode to read/write non-text files,
2403  * which include all archive formats as well as compressed plain text.
2404  * Force stdin/stdout into binary mode if that is what we are using.
2405  */
2406 #ifdef WIN32
2407  if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2408  (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2409  {
2410  if (mode == archModeWrite)
2411  _setmode(fileno(stdout), O_BINARY);
2412  else
2413  _setmode(fileno(stdin), O_BINARY);
2414  }
2415 #endif
2416 
2417  AH->SetupWorkerPtr = setupWorkerPtr;
2418 
2419  if (fmt == archUnknown)
2420  AH->format = _discoverArchiveFormat(AH);
2421  else
2422  AH->format = fmt;
2423 
2424  switch (AH->format)
2425  {
2426  case archCustom:
2428  break;
2429 
2430  case archNull:
2431  InitArchiveFmt_Null(AH);
2432  break;
2433 
2434  case archDirectory:
2436  break;
2437 
2438  case archTar:
2439  InitArchiveFmt_Tar(AH);
2440  break;
2441 
2442  default:
2443  pg_fatal("unrecognized file format \"%d\"", fmt);
2444  }
2445 
2446  return AH;
2447 }
2448 
2449 /*
2450  * Write out all data (tables & LOs)
2451  */
2452 void
2454 {
2455  TocEntry *te;
2456 
2457  if (pstate && pstate->numWorkers > 1)
2458  {
2459  /*
2460  * In parallel mode, this code runs in the leader process. We
2461  * construct an array of candidate TEs, then sort it into decreasing
2462  * size order, then dispatch each TE to a data-transfer worker. By
2463  * dumping larger tables first, we avoid getting into a situation
2464  * where we're down to one job and it's big, losing parallelism.
2465  */
2466  TocEntry **tes;
2467  int ntes;
2468 
2469  tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2470  ntes = 0;
2471  for (te = AH->toc->next; te != AH->toc; te = te->next)
2472  {
2473  /* Consider only TEs with dataDumper functions ... */
2474  if (!te->dataDumper)
2475  continue;
2476  /* ... and ignore ones not enabled for dump */
2477  if ((te->reqs & REQ_DATA) == 0)
2478  continue;
2479 
2480  tes[ntes++] = te;
2481  }
2482 
2483  if (ntes > 1)
2484  qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2485 
2486  for (int i = 0; i < ntes; i++)
2487  DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2488  mark_dump_job_done, NULL);
2489 
2490  pg_free(tes);
2491 
2492  /* Now wait for workers to finish. */
2493  WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2494  }
2495  else
2496  {
2497  /* Non-parallel mode: just dump all candidate TEs sequentially. */
2498  for (te = AH->toc->next; te != AH->toc; te = te->next)
2499  {
2500  /* Must have same filter conditions as above */
2501  if (!te->dataDumper)
2502  continue;
2503  if ((te->reqs & REQ_DATA) == 0)
2504  continue;
2505 
2507  }
2508  }
2509 }
2510 
2511 
2512 /*
2513  * Callback function that's invoked in the leader process after a step has
2514  * been parallel dumped.
2515  *
2516  * We don't need to do anything except check for worker failure.
2517  */
2518 static void
2520  TocEntry *te,
2521  int status,
2522  void *callback_data)
2523 {
2524  pg_log_info("finished item %d %s %s",
2525  te->dumpId, te->desc, te->tag);
2526 
2527  if (status != 0)
2528  pg_fatal("worker process failed: exit code %d",
2529  status);
2530 }
2531 
2532 
2533 void
2535 {
2536  StartDataPtrType startPtr;
2537  EndDataPtrType endPtr;
2538 
2539  AH->currToc = te;
2540 
2541  if (strcmp(te->desc, "BLOBS") == 0)
2542  {
2543  startPtr = AH->StartLOsPtr;
2544  endPtr = AH->EndLOsPtr;
2545  }
2546  else
2547  {
2548  startPtr = AH->StartDataPtr;
2549  endPtr = AH->EndDataPtr;
2550  }
2551 
2552  if (startPtr != NULL)
2553  (*startPtr) (AH, te);
2554 
2555  /*
2556  * The user-provided DataDumper routine needs to call AH->WriteData
2557  */
2558  te->dataDumper((Archive *) AH, te->dataDumperArg);
2559 
2560  if (endPtr != NULL)
2561  (*endPtr) (AH, te);
2562 
2563  AH->currToc = NULL;
2564 }
2565 
2566 void
2568 {
2569  TocEntry *te;
2570  char workbuf[32];
2571  int tocCount;
2572  int i;
2573 
2574  /* count entries that will actually be dumped */
2575  tocCount = 0;
2576  for (te = AH->toc->next; te != AH->toc; te = te->next)
2577  {
2578  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2579  tocCount++;
2580  }
2581 
2582  /* printf("%d TOC Entries to save\n", tocCount); */
2583 
2584  WriteInt(AH, tocCount);
2585 
2586  for (te = AH->toc->next; te != AH->toc; te = te->next)
2587  {
2588  if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2589  continue;
2590 
2591  WriteInt(AH, te->dumpId);
2592  WriteInt(AH, te->dataDumper ? 1 : 0);
2593 
2594  /* OID is recorded as a string for historical reasons */
2595  sprintf(workbuf, "%u", te->catalogId.tableoid);
2596  WriteStr(AH, workbuf);
2597  sprintf(workbuf, "%u", te->catalogId.oid);
2598  WriteStr(AH, workbuf);
2599 
2600  WriteStr(AH, te->tag);
2601  WriteStr(AH, te->desc);
2602  WriteInt(AH, te->section);
2603  WriteStr(AH, te->defn);
2604  WriteStr(AH, te->dropStmt);
2605  WriteStr(AH, te->copyStmt);
2606  WriteStr(AH, te->namespace);
2607  WriteStr(AH, te->tablespace);
2608  WriteStr(AH, te->tableam);
2609  WriteInt(AH, te->relkind);
2610  WriteStr(AH, te->owner);
2611  WriteStr(AH, "false");
2612 
2613  /* Dump list of dependencies */
2614  for (i = 0; i < te->nDeps; i++)
2615  {
2616  sprintf(workbuf, "%d", te->dependencies[i]);
2617  WriteStr(AH, workbuf);
2618  }
2619  WriteStr(AH, NULL); /* Terminate List */
2620 
2621  if (AH->WriteExtraTocPtr)
2622  AH->WriteExtraTocPtr(AH, te);
2623  }
2624 }
2625 
2626 void
2628 {
2629  int i;
2630  char *tmp;
2631  DumpId *deps;
2632  int depIdx;
2633  int depSize;
2634  TocEntry *te;
2635  bool is_supported;
2636 
2637  AH->tocCount = ReadInt(AH);
2638  AH->maxDumpId = 0;
2639 
2640  for (i = 0; i < AH->tocCount; i++)
2641  {
2642  te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2643  te->dumpId = ReadInt(AH);
2644 
2645  if (te->dumpId > AH->maxDumpId)
2646  AH->maxDumpId = te->dumpId;
2647 
2648  /* Sanity check */
2649  if (te->dumpId <= 0)
2650  pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2651  te->dumpId);
2652 
2653  te->hadDumper = ReadInt(AH);
2654 
2655  if (AH->version >= K_VERS_1_8)
2656  {
2657  tmp = ReadStr(AH);
2658  sscanf(tmp, "%u", &te->catalogId.tableoid);
2659  free(tmp);
2660  }
2661  else
2663  tmp = ReadStr(AH);
2664  sscanf(tmp, "%u", &te->catalogId.oid);
2665  free(tmp);
2666 
2667  te->tag = ReadStr(AH);
2668  te->desc = ReadStr(AH);
2669 
2670  if (AH->version >= K_VERS_1_11)
2671  {
2672  te->section = ReadInt(AH);
2673  }
2674  else
2675  {
2676  /*
2677  * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2678  * the entries into sections. This list need not cover entry
2679  * types added later than 8.4.
2680  */
2681  if (strcmp(te->desc, "COMMENT") == 0 ||
2682  strcmp(te->desc, "ACL") == 0 ||
2683  strcmp(te->desc, "ACL LANGUAGE") == 0)
2684  te->section = SECTION_NONE;
2685  else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2686  strcmp(te->desc, "BLOBS") == 0 ||
2687  strcmp(te->desc, "BLOB COMMENTS") == 0)
2688  te->section = SECTION_DATA;
2689  else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2690  strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2691  strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2692  strcmp(te->desc, "INDEX") == 0 ||
2693  strcmp(te->desc, "RULE") == 0 ||
2694  strcmp(te->desc, "TRIGGER") == 0)
2695  te->section = SECTION_POST_DATA;
2696  else
2697  te->section = SECTION_PRE_DATA;
2698  }
2699 
2700  te->defn = ReadStr(AH);
2701  te->dropStmt = ReadStr(AH);
2702 
2703  if (AH->version >= K_VERS_1_3)
2704  te->copyStmt = ReadStr(AH);
2705 
2706  if (AH->version >= K_VERS_1_6)
2707  te->namespace = ReadStr(AH);
2708 
2709  if (AH->version >= K_VERS_1_10)
2710  te->tablespace = ReadStr(AH);
2711 
2712  if (AH->version >= K_VERS_1_14)
2713  te->tableam = ReadStr(AH);
2714 
2715  if (AH->version >= K_VERS_1_16)
2716  te->relkind = ReadInt(AH);
2717 
2718  te->owner = ReadStr(AH);
2719  is_supported = true;
2720  if (AH->version < K_VERS_1_9)
2721  is_supported = false;
2722  else
2723  {
2724  tmp = ReadStr(AH);
2725 
2726  if (strcmp(tmp, "true") == 0)
2727  is_supported = false;
2728 
2729  free(tmp);
2730  }
2731 
2732  if (!is_supported)
2733  pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2734 
2735  /* Read TOC entry dependencies */
2736  if (AH->version >= K_VERS_1_5)
2737  {
2738  depSize = 100;
2739  deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2740  depIdx = 0;
2741  for (;;)
2742  {
2743  tmp = ReadStr(AH);
2744  if (!tmp)
2745  break; /* end of list */
2746  if (depIdx >= depSize)
2747  {
2748  depSize *= 2;
2749  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2750  }
2751  sscanf(tmp, "%d", &deps[depIdx]);
2752  free(tmp);
2753  depIdx++;
2754  }
2755 
2756  if (depIdx > 0) /* We have a non-null entry */
2757  {
2758  deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2759  te->dependencies = deps;
2760  te->nDeps = depIdx;
2761  }
2762  else
2763  {
2764  free(deps);
2765  te->dependencies = NULL;
2766  te->nDeps = 0;
2767  }
2768  }
2769  else
2770  {
2771  te->dependencies = NULL;
2772  te->nDeps = 0;
2773  }
2774  te->dataLength = 0;
2775 
2776  if (AH->ReadExtraTocPtr)
2777  AH->ReadExtraTocPtr(AH, te);
2778 
2779  pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2780  i, te->dumpId, te->desc, te->tag);
2781 
2782  /* link completed entry into TOC circular list */
2783  te->prev = AH->toc->prev;
2784  AH->toc->prev->next = te;
2785  AH->toc->prev = te;
2786  te->next = AH->toc;
2787 
2788  /* special processing immediately upon read for some items */
2789  if (strcmp(te->desc, "ENCODING") == 0)
2790  processEncodingEntry(AH, te);
2791  else if (strcmp(te->desc, "STDSTRINGS") == 0)
2792  processStdStringsEntry(AH, te);
2793  else if (strcmp(te->desc, "SEARCHPATH") == 0)
2794  processSearchPathEntry(AH, te);
2795  }
2796 }
2797 
2798 static void
2800 {
2801  /* te->defn should have the form SET client_encoding = 'foo'; */
2802  char *defn = pg_strdup(te->defn);
2803  char *ptr1;
2804  char *ptr2 = NULL;
2805  int encoding;
2806 
2807  ptr1 = strchr(defn, '\'');
2808  if (ptr1)
2809  ptr2 = strchr(++ptr1, '\'');
2810  if (ptr2)
2811  {
2812  *ptr2 = '\0';
2813  encoding = pg_char_to_encoding(ptr1);
2814  if (encoding < 0)
2815  pg_fatal("unrecognized encoding \"%s\"",
2816  ptr1);
2817  AH->public.encoding = encoding;
2818  }
2819  else
2820  pg_fatal("invalid ENCODING item: %s",
2821  te->defn);
2822 
2823  free(defn);
2824 }
2825 
2826 static void
2828 {
2829  /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2830  char *ptr1;
2831 
2832  ptr1 = strchr(te->defn, '\'');
2833  if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2834  AH->public.std_strings = true;
2835  else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2836  AH->public.std_strings = false;
2837  else
2838  pg_fatal("invalid STDSTRINGS item: %s",
2839  te->defn);
2840 }
2841 
2842 static void
2844 {
2845  /*
2846  * te->defn should contain a command to set search_path. We just copy it
2847  * verbatim for use later.
2848  */
2849  AH->public.searchpath = pg_strdup(te->defn);
2850 }
2851 
2852 static void
2854 {
2855  const char *missing_name;
2856 
2857  Assert(ropt->strict_names);
2858 
2859  if (ropt->schemaNames.head != NULL)
2860  {
2861  missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2862  if (missing_name != NULL)
2863  pg_fatal("schema \"%s\" not found", missing_name);
2864  }
2865 
2866  if (ropt->tableNames.head != NULL)
2867  {
2868  missing_name = simple_string_list_not_touched(&ropt->tableNames);
2869  if (missing_name != NULL)
2870  pg_fatal("table \"%s\" not found", missing_name);
2871  }
2872 
2873  if (ropt->indexNames.head != NULL)
2874  {
2875  missing_name = simple_string_list_not_touched(&ropt->indexNames);
2876  if (missing_name != NULL)
2877  pg_fatal("index \"%s\" not found", missing_name);
2878  }
2879 
2880  if (ropt->functionNames.head != NULL)
2881  {
2882  missing_name = simple_string_list_not_touched(&ropt->functionNames);
2883  if (missing_name != NULL)
2884  pg_fatal("function \"%s\" not found", missing_name);
2885  }
2886 
2887  if (ropt->triggerNames.head != NULL)
2888  {
2889  missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2890  if (missing_name != NULL)
2891  pg_fatal("trigger \"%s\" not found", missing_name);
2892  }
2893 }
2894 
2895 /*
2896  * Determine whether we want to restore this TOC entry.
2897  *
2898  * Returns 0 if entry should be skipped, or some combination of the
2899  * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2900  * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2901  */
2902 static int
2904 {
2905  int res = REQ_SCHEMA | REQ_DATA;
2906  RestoreOptions *ropt = AH->public.ropt;
2907 
2908  /* These items are treated specially */
2909  if (strcmp(te->desc, "ENCODING") == 0 ||
2910  strcmp(te->desc, "STDSTRINGS") == 0 ||
2911  strcmp(te->desc, "SEARCHPATH") == 0)
2912  return REQ_SPECIAL;
2913 
2914  /*
2915  * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2916  * restored in createDB mode, and not restored otherwise, independently of
2917  * all else.
2918  */
2919  if (strcmp(te->desc, "DATABASE") == 0 ||
2920  strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2921  {
2922  if (ropt->createDB)
2923  return REQ_SCHEMA;
2924  else
2925  return 0;
2926  }
2927 
2928  /*
2929  * Process exclusions that affect certain classes of TOC entries.
2930  */
2931 
2932  /* If it's an ACL, maybe ignore it */
2933  if (ropt->aclsSkip && _tocEntryIsACL(te))
2934  return 0;
2935 
2936  /* If it's a comment, maybe ignore it */
2937  if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2938  return 0;
2939 
2940  /*
2941  * If it's a publication or a table part of a publication, maybe ignore
2942  * it.
2943  */
2944  if (ropt->no_publications &&
2945  (strcmp(te->desc, "PUBLICATION") == 0 ||
2946  strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2947  strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2948  return 0;
2949 
2950  /* If it's a security label, maybe ignore it */
2951  if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2952  return 0;
2953 
2954  /* If it's a subscription, maybe ignore it */
2955  if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2956  return 0;
2957 
2958  /* Ignore it if section is not to be dumped/restored */
2959  switch (curSection)
2960  {
2961  case SECTION_PRE_DATA:
2962  if (!(ropt->dumpSections & DUMP_PRE_DATA))
2963  return 0;
2964  break;
2965  case SECTION_DATA:
2966  if (!(ropt->dumpSections & DUMP_DATA))
2967  return 0;
2968  break;
2969  case SECTION_POST_DATA:
2970  if (!(ropt->dumpSections & DUMP_POST_DATA))
2971  return 0;
2972  break;
2973  default:
2974  /* shouldn't get here, really, but ignore it */
2975  return 0;
2976  }
2977 
2978  /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2979  if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2980  return 0;
2981 
2982  /*
2983  * Check options for selective dump/restore.
2984  */
2985  if (strcmp(te->desc, "ACL") == 0 ||
2986  strcmp(te->desc, "COMMENT") == 0 ||
2987  strcmp(te->desc, "SECURITY LABEL") == 0)
2988  {
2989  /* Database properties react to createDB, not selectivity options. */
2990  if (strncmp(te->tag, "DATABASE ", 9) == 0)
2991  {
2992  if (!ropt->createDB)
2993  return 0;
2994  }
2995  else if (ropt->schemaNames.head != NULL ||
2996  ropt->schemaExcludeNames.head != NULL ||
2997  ropt->selTypes)
2998  {
2999  /*
3000  * In a selective dump/restore, we want to restore these dependent
3001  * TOC entry types only if their parent object is being restored.
3002  * Without selectivity options, we let through everything in the
3003  * archive. Note there may be such entries with no parent, eg
3004  * non-default ACLs for built-in objects. Also, we make
3005  * per-column ACLs additionally depend on the table's ACL if any
3006  * to ensure correct restore order, so those dependencies should
3007  * be ignored in this check.
3008  *
3009  * This code depends on the parent having been marked already,
3010  * which should be the case; if it isn't, perhaps due to
3011  * SortTocFromFile rearrangement, skipping the dependent entry
3012  * seems prudent anyway.
3013  *
3014  * Ideally we'd handle, eg, table CHECK constraints this way too.
3015  * But it's hard to tell which of their dependencies is the one to
3016  * consult.
3017  */
3018  bool dumpthis = false;
3019 
3020  for (int i = 0; i < te->nDeps; i++)
3021  {
3022  TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3023 
3024  if (!pte)
3025  continue; /* probably shouldn't happen */
3026  if (strcmp(pte->desc, "ACL") == 0)
3027  continue; /* ignore dependency on another ACL */
3028  if (pte->reqs == 0)
3029  continue; /* this object isn't marked, so ignore it */
3030  /* Found a parent to be dumped, so we want to dump this too */
3031  dumpthis = true;
3032  break;
3033  }
3034  if (!dumpthis)
3035  return 0;
3036  }
3037  }
3038  else
3039  {
3040  /* Apply selective-restore rules for standalone TOC entries. */
3041  if (ropt->schemaNames.head != NULL)
3042  {
3043  /* If no namespace is specified, it means all. */
3044  if (!te->namespace)
3045  return 0;
3046  if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3047  return 0;
3048  }
3049 
3050  if (ropt->schemaExcludeNames.head != NULL &&
3051  te->namespace &&
3052  simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3053  return 0;
3054 
3055  if (ropt->selTypes)
3056  {
3057  if (strcmp(te->desc, "TABLE") == 0 ||
3058  strcmp(te->desc, "TABLE DATA") == 0 ||
3059  strcmp(te->desc, "VIEW") == 0 ||
3060  strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3061  strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3062  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3063  strcmp(te->desc, "SEQUENCE") == 0 ||
3064  strcmp(te->desc, "SEQUENCE SET") == 0)
3065  {
3066  if (!ropt->selTable)
3067  return 0;
3068  if (ropt->tableNames.head != NULL &&
3070  return 0;
3071  }
3072  else if (strcmp(te->desc, "INDEX") == 0)
3073  {
3074  if (!ropt->selIndex)
3075  return 0;
3076  if (ropt->indexNames.head != NULL &&
3078  return 0;
3079  }
3080  else if (strcmp(te->desc, "FUNCTION") == 0 ||
3081  strcmp(te->desc, "AGGREGATE") == 0 ||
3082  strcmp(te->desc, "PROCEDURE") == 0)
3083  {
3084  if (!ropt->selFunction)
3085  return 0;
3086  if (ropt->functionNames.head != NULL &&
3088  return 0;
3089  }
3090  else if (strcmp(te->desc, "TRIGGER") == 0)
3091  {
3092  if (!ropt->selTrigger)
3093  return 0;
3094  if (ropt->triggerNames.head != NULL &&
3096  return 0;
3097  }
3098  else
3099  return 0;
3100  }
3101  }
3102 
3103  /*
3104  * Determine whether the TOC entry contains schema and/or data components,
3105  * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3106  * it's both schema and data. Otherwise it's probably schema-only, but
3107  * there are exceptions.
3108  */
3109  if (!te->hadDumper)
3110  {
3111  /*
3112  * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3113  * is considered a data entry. We don't need to check for BLOBS or
3114  * old-style BLOB COMMENTS entries, because they will have hadDumper =
3115  * true ... but we do need to check new-style BLOB ACLs, comments,
3116  * etc.
3117  */
3118  if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3119  strcmp(te->desc, "BLOB") == 0 ||
3120  strcmp(te->desc, "BLOB METADATA") == 0 ||
3121  (strcmp(te->desc, "ACL") == 0 &&
3122  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3123  (strcmp(te->desc, "COMMENT") == 0 &&
3124  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3125  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3126  strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3127  res = res & REQ_DATA;
3128  else
3129  res = res & ~REQ_DATA;
3130  }
3131 
3132  /*
3133  * If there's no definition command, there's no schema component. Treat
3134  * "load via partition root" comments as not schema.
3135  */
3136  if (!te->defn || !te->defn[0] ||
3137  strncmp(te->defn, "-- load via partition root ", 27) == 0)
3138  res = res & ~REQ_SCHEMA;
3139 
3140  /*
3141  * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3142  * always ignore it.
3143  */
3144  if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3145  return 0;
3146 
3147  /* Mask it if we only want schema */
3148  if (ropt->schemaOnly)
3149  {
3150  /*
3151  * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3152  *
3153  * In binary-upgrade mode, even with schemaOnly set, we do not mask
3154  * out large objects. (Only large object definitions, comments and
3155  * other metadata should be generated in binary-upgrade mode, not the
3156  * actual data, but that need not concern us here.)
3157  */
3158  if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3159  !(ropt->binary_upgrade &&
3160  (strcmp(te->desc, "BLOB") == 0 ||
3161  strcmp(te->desc, "BLOB METADATA") == 0 ||
3162  (strcmp(te->desc, "ACL") == 0 &&
3163  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3164  (strcmp(te->desc, "COMMENT") == 0 &&
3165  strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3166  (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3167  strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3168  res = res & REQ_SCHEMA;
3169  }
3170 
3171  /* Mask it if we only want data */
3172  if (ropt->dataOnly)
3173  res = res & REQ_DATA;
3174 
3175  return res;
3176 }
3177 
3178 /*
3179  * Identify which pass we should restore this TOC entry in.
3180  *
3181  * See notes with the RestorePass typedef in pg_backup_archiver.h.
3182  */
3183 static RestorePass
3185 {
3186  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3187  if (strcmp(te->desc, "ACL") == 0 ||
3188  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3189  strcmp(te->desc, "DEFAULT ACL") == 0)
3190  return RESTORE_PASS_ACL;
3191  if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3192  strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3193  return RESTORE_PASS_POST_ACL;
3194 
3195  /*
3196  * Comments need to be emitted in the same pass as their parent objects.
3197  * ACLs haven't got comments, and neither do matview data objects, but
3198  * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3199  * we'd need yet another weird special case.)
3200  */
3201  if (strcmp(te->desc, "COMMENT") == 0 &&
3202  strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3203  return RESTORE_PASS_POST_ACL;
3204 
3205  /* All else can be handled in the main pass. */
3206  return RESTORE_PASS_MAIN;
3207 }
3208 
3209 /*
3210  * Identify TOC entries that are ACLs.
3211  *
3212  * Note: it seems worth duplicating some code here to avoid a hard-wired
3213  * assumption that these are exactly the same entries that we restore during
3214  * the RESTORE_PASS_ACL phase.
3215  */
3216 static bool
3218 {
3219  /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3220  if (strcmp(te->desc, "ACL") == 0 ||
3221  strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3222  strcmp(te->desc, "DEFAULT ACL") == 0)
3223  return true;
3224  return false;
3225 }
3226 
3227 /*
3228  * Issue SET commands for parameters that we want to have set the same way
3229  * at all times during execution of a restore script.
3230  */
3231 static void
3233 {
3234  RestoreOptions *ropt = AH->public.ropt;
3235 
3236  /*
3237  * Disable timeouts to allow for slow commands, idle parallel workers, etc
3238  */
3239  ahprintf(AH, "SET statement_timeout = 0;\n");
3240  ahprintf(AH, "SET lock_timeout = 0;\n");
3241  ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3242  ahprintf(AH, "SET transaction_timeout = 0;\n");
3243 
3244  /* Select the correct character set encoding */
3245  ahprintf(AH, "SET client_encoding = '%s';\n",
3247 
3248  /* Select the correct string literal syntax */
3249  ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3250  AH->public.std_strings ? "on" : "off");
3251 
3252  /* Select the role to be used during restore */
3253  if (ropt && ropt->use_role)
3254  ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3255 
3256  /* Select the dump-time search_path */
3257  if (AH->public.searchpath)
3258  ahprintf(AH, "%s", AH->public.searchpath);
3259 
3260  /* Make sure function checking is disabled */
3261  ahprintf(AH, "SET check_function_bodies = false;\n");
3262 
3263  /* Ensure that all valid XML data will be accepted */
3264  ahprintf(AH, "SET xmloption = content;\n");
3265 
3266  /* Avoid annoying notices etc */
3267  ahprintf(AH, "SET client_min_messages = warning;\n");
3268  if (!AH->public.std_strings)
3269  ahprintf(AH, "SET escape_string_warning = off;\n");
3270 
3271  /* Adjust row-security state */
3272  if (ropt && ropt->enable_row_security)
3273  ahprintf(AH, "SET row_security = on;\n");
3274  else
3275  ahprintf(AH, "SET row_security = off;\n");
3276 
3277  /*
3278  * In --transaction-size mode, we should always be in a transaction when
3279  * we begin to restore objects.
3280  */
3281  if (ropt && ropt->txn_size > 0)
3282  {
3283  if (AH->connection)
3284  StartTransaction(&AH->public);
3285  else
3286  ahprintf(AH, "\nBEGIN;\n");
3287  AH->txnCount = 0;
3288  }
3289 
3290  ahprintf(AH, "\n");
3291 }
3292 
3293 /*
3294  * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3295  * for updating state if appropriate. If user is NULL or an empty string,
3296  * the specification DEFAULT will be used.
3297  */
3298 static void
3300 {
3302 
3303  appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3304 
3305  /*
3306  * SQL requires a string literal here. Might as well be correct.
3307  */
3308  if (user && *user)
3309  appendStringLiteralAHX(cmd, user, AH);
3310  else
3311  appendPQExpBufferStr(cmd, "DEFAULT");
3312  appendPQExpBufferChar(cmd, ';');
3313 
3314  if (RestoringToDB(AH))
3315  {
3316  PGresult *res;
3317 
3318  res = PQexec(AH->connection, cmd->data);
3319 
3320  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3321  /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3322  pg_fatal("could not set session user to \"%s\": %s",
3324 
3325  PQclear(res);
3326  }
3327  else
3328  ahprintf(AH, "%s\n\n", cmd->data);
3329 
3330  destroyPQExpBuffer(cmd);
3331 }
3332 
3333 
3334 /*
3335  * Issue the commands to connect to the specified database.
3336  *
3337  * If we're currently restoring right into a database, this will
3338  * actually establish a connection. Otherwise it puts a \connect into
3339  * the script output.
3340  */
3341 static void
3343 {
3344  if (RestoringToDB(AH))
3346  else
3347  {
3348  PQExpBufferData connectbuf;
3349 
3350  initPQExpBuffer(&connectbuf);
3351  appendPsqlMetaConnect(&connectbuf, dbname);
3352  ahprintf(AH, "%s\n", connectbuf.data);
3353  termPQExpBuffer(&connectbuf);
3354  }
3355 
3356  /*
3357  * NOTE: currUser keeps track of what the imaginary session user in our
3358  * script is. It's now effectively reset to the original userID.
3359  */
3360  free(AH->currUser);
3361  AH->currUser = NULL;
3362 
3363  /* don't assume we still know the output schema, tablespace, etc either */
3364  free(AH->currSchema);
3365  AH->currSchema = NULL;
3366 
3367  free(AH->currTableAm);
3368  AH->currTableAm = NULL;
3369 
3370  free(AH->currTablespace);
3371  AH->currTablespace = NULL;
3372 
3373  /* re-establish fixed state */
3375 }
3376 
3377 /*
3378  * Become the specified user, and update state to avoid redundant commands
3379  *
3380  * NULL or empty argument is taken to mean restoring the session default
3381  */
3382 static void
3383 _becomeUser(ArchiveHandle *AH, const char *user)
3384 {
3385  if (!user)
3386  user = ""; /* avoid null pointers */
3387 
3388  if (AH->currUser && strcmp(AH->currUser, user) == 0)
3389  return; /* no need to do anything */
3390 
3391  _doSetSessionAuth(AH, user);
3392 
3393  /*
3394  * NOTE: currUser keeps track of what the imaginary session user in our
3395  * script is
3396  */
3397  free(AH->currUser);
3398  AH->currUser = pg_strdup(user);
3399 }
3400 
3401 /*
3402  * Become the owner of the given TOC entry object. If
3403  * changes in ownership are not allowed, this doesn't do anything.
3404  */
3405 static void
3407 {
3408  RestoreOptions *ropt = AH->public.ropt;
3409 
3410  if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3411  return;
3412 
3413  _becomeUser(AH, te->owner);
3414 }
3415 
3416 
3417 /*
3418  * Issue the commands to select the specified schema as the current schema
3419  * in the target database.
3420  */
3421 static void
3422 _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3423 {
3424  PQExpBuffer qry;
3425 
3426  /*
3427  * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3428  * that search_path rather than switching to entry-specific paths.
3429  * Otherwise, it's an old archive that will not restore correctly unless
3430  * we set the search_path as it's expecting.
3431  */
3432  if (AH->public.searchpath)
3433  return;
3434 
3435  if (!schemaName || *schemaName == '\0' ||
3436  (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3437  return; /* no need to do anything */
3438 
3439  qry = createPQExpBuffer();
3440 
3441  appendPQExpBuffer(qry, "SET search_path = %s",
3442  fmtId(schemaName));
3443  if (strcmp(schemaName, "pg_catalog") != 0)
3444  appendPQExpBufferStr(qry, ", pg_catalog");
3445 
3446  if (RestoringToDB(AH))
3447  {
3448  PGresult *res;
3449 
3450  res = PQexec(AH->connection, qry->data);
3451 
3452  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3454  "could not set search_path to \"%s\": %s",
3455  schemaName, PQerrorMessage(AH->connection));
3456 
3457  PQclear(res);
3458  }
3459  else
3460  ahprintf(AH, "%s;\n\n", qry->data);
3461 
3462  free(AH->currSchema);
3463  AH->currSchema = pg_strdup(schemaName);
3464 
3465  destroyPQExpBuffer(qry);
3466 }
3467 
3468 /*
3469  * Issue the commands to select the specified tablespace as the current one
3470  * in the target database.
3471  */
3472 static void
3474 {
3475  RestoreOptions *ropt = AH->public.ropt;
3476  PQExpBuffer qry;
3477  const char *want,
3478  *have;
3479 
3480  /* do nothing in --no-tablespaces mode */
3481  if (ropt->noTablespace)
3482  return;
3483 
3484  have = AH->currTablespace;
3485  want = tablespace;
3486 
3487  /* no need to do anything for non-tablespace object */
3488  if (!want)
3489  return;
3490 
3491  if (have && strcmp(want, have) == 0)
3492  return; /* no need to do anything */
3493 
3494  qry = createPQExpBuffer();
3495 
3496  if (strcmp(want, "") == 0)
3497  {
3498  /* We want the tablespace to be the database's default */
3499  appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3500  }
3501  else
3502  {
3503  /* We want an explicit tablespace */
3504  appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3505  }
3506 
3507  if (RestoringToDB(AH))
3508  {
3509  PGresult *res;
3510 
3511  res = PQexec(AH->connection, qry->data);
3512 
3513  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3515  "could not set default_tablespace to %s: %s",
3516  fmtId(want), PQerrorMessage(AH->connection));
3517 
3518  PQclear(res);
3519  }
3520  else
3521  ahprintf(AH, "%s;\n\n", qry->data);
3522 
3523  free(AH->currTablespace);
3524  AH->currTablespace = pg_strdup(want);
3525 
3526  destroyPQExpBuffer(qry);
3527 }
3528 
3529 /*
3530  * Set the proper default_table_access_method value for the table.
3531  */
3532 static void
3533 _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3534 {
3535  RestoreOptions *ropt = AH->public.ropt;
3536  PQExpBuffer cmd;
3537  const char *want,
3538  *have;
3539 
3540  /* do nothing in --no-table-access-method mode */
3541  if (ropt->noTableAm)
3542  return;
3543 
3544  have = AH->currTableAm;
3545  want = tableam;
3546 
3547  if (!want)
3548  return;
3549 
3550  if (have && strcmp(want, have) == 0)
3551  return;
3552 
3553  cmd = createPQExpBuffer();
3554  appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3555 
3556  if (RestoringToDB(AH))
3557  {
3558  PGresult *res;
3559 
3560  res = PQexec(AH->connection, cmd->data);
3561 
3562  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3564  "could not set default_table_access_method: %s",
3565  PQerrorMessage(AH->connection));
3566 
3567  PQclear(res);
3568  }
3569  else
3570  ahprintf(AH, "%s\n\n", cmd->data);
3571 
3572  destroyPQExpBuffer(cmd);
3573 
3574  free(AH->currTableAm);
3575  AH->currTableAm = pg_strdup(want);
3576 }
3577 
3578 /*
3579  * Set the proper default table access method for a table without storage.
3580  * Currently, this is required only for partitioned tables with a table AM.
3581  */
3582 static void
3584 {
3585  RestoreOptions *ropt = AH->public.ropt;
3586  const char *tableam = te->tableam;
3587  PQExpBuffer cmd;
3588 
3589  /* do nothing in --no-table-access-method mode */
3590  if (ropt->noTableAm)
3591  return;
3592 
3593  if (!tableam)
3594  return;
3595 
3596  Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3597 
3598  cmd = createPQExpBuffer();
3599 
3600  appendPQExpBufferStr(cmd, "ALTER TABLE ");
3601  appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3602  appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3603  fmtId(tableam));
3604 
3605  if (RestoringToDB(AH))
3606  {
3607  PGresult *res;
3608 
3609  res = PQexec(AH->connection, cmd->data);
3610 
3611  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3613  "could not alter table access method: %s",
3614  PQerrorMessage(AH->connection));
3615  PQclear(res);
3616  }
3617  else
3618  ahprintf(AH, "%s\n\n", cmd->data);
3619 
3620  destroyPQExpBuffer(cmd);
3621 }
3622 
3623 /*
3624  * Extract an object description for a TOC entry, and append it to buf.
3625  *
3626  * This is used for ALTER ... OWNER TO.
3627  *
3628  * If the object type has no owner, do nothing.
3629  */
3630 static void
3632 {
3633  const char *type = te->desc;
3634 
3635  /* objects that don't require special decoration */
3636  if (strcmp(type, "COLLATION") == 0 ||
3637  strcmp(type, "CONVERSION") == 0 ||
3638  strcmp(type, "DOMAIN") == 0 ||
3639  strcmp(type, "FOREIGN TABLE") == 0 ||
3640  strcmp(type, "MATERIALIZED VIEW") == 0 ||
3641  strcmp(type, "SEQUENCE") == 0 ||
3642  strcmp(type, "STATISTICS") == 0 ||
3643  strcmp(type, "TABLE") == 0 ||
3644  strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3645  strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3646  strcmp(type, "TYPE") == 0 ||
3647  strcmp(type, "VIEW") == 0 ||
3648  /* non-schema-specified objects */
3649  strcmp(type, "DATABASE") == 0 ||
3650  strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3651  strcmp(type, "SCHEMA") == 0 ||
3652  strcmp(type, "EVENT TRIGGER") == 0 ||
3653  strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3654  strcmp(type, "SERVER") == 0 ||
3655  strcmp(type, "PUBLICATION") == 0 ||
3656  strcmp(type, "SUBSCRIPTION") == 0)
3657  {
3658  appendPQExpBuffer(buf, "%s ", type);
3659  if (te->namespace && *te->namespace)
3660  appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3662  }
3663  /* LOs just have a name, but it's numeric so must not use fmtId */
3664  else if (strcmp(type, "BLOB") == 0)
3665  {
3666  appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3667  }
3668 
3669  /*
3670  * These object types require additional decoration. Fortunately, the
3671  * information needed is exactly what's in the DROP command.
3672  */
3673  else if (strcmp(type, "AGGREGATE") == 0 ||
3674  strcmp(type, "FUNCTION") == 0 ||
3675  strcmp(type, "OPERATOR") == 0 ||
3676  strcmp(type, "OPERATOR CLASS") == 0 ||
3677  strcmp(type, "OPERATOR FAMILY") == 0 ||
3678  strcmp(type, "PROCEDURE") == 0)
3679  {
3680  /* Chop "DROP " off the front and make a modifiable copy */
3681  char *first = pg_strdup(te->dropStmt + 5);
3682  char *last;
3683 
3684  /* point to last character in string */
3685  last = first + strlen(first) - 1;
3686 
3687  /* Strip off any ';' or '\n' at the end */
3688  while (last >= first && (*last == '\n' || *last == ';'))
3689  last--;
3690  *(last + 1) = '\0';
3691 
3692  appendPQExpBufferStr(buf, first);
3693 
3694  free(first);
3695  return;
3696  }
3697  /* these object types don't have separate owners */
3698  else if (strcmp(type, "CAST") == 0 ||
3699  strcmp(type, "CHECK CONSTRAINT") == 0 ||
3700  strcmp(type, "CONSTRAINT") == 0 ||
3701  strcmp(type, "DATABASE PROPERTIES") == 0 ||
3702  strcmp(type, "DEFAULT") == 0 ||
3703  strcmp(type, "FK CONSTRAINT") == 0 ||
3704  strcmp(type, "INDEX") == 0 ||
3705  strcmp(type, "RULE") == 0 ||
3706  strcmp(type, "TRIGGER") == 0 ||
3707  strcmp(type, "ROW SECURITY") == 0 ||
3708  strcmp(type, "POLICY") == 0 ||
3709  strcmp(type, "USER MAPPING") == 0)
3710  {
3711  /* do nothing */
3712  }
3713  else
3714  pg_fatal("don't know how to set owner for object type \"%s\"", type);
3715 }
3716 
3717 /*
3718  * Emit the SQL commands to create the object represented by a TOC entry
3719  *
3720  * This now also includes issuing an ALTER OWNER command to restore the
3721  * object's ownership, if wanted. But note that the object's permissions
3722  * will remain at default, until the matching ACL TOC entry is restored.
3723  */
3724 static void
3725 _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3726 {
3727  RestoreOptions *ropt = AH->public.ropt;
3728 
3729  /*
3730  * Select owner, schema, tablespace and default AM as necessary. The
3731  * default access method for partitioned tables is handled after
3732  * generating the object definition, as it requires an ALTER command
3733  * rather than SET.
3734  */
3735  _becomeOwner(AH, te);
3736  _selectOutputSchema(AH, te->namespace);
3737  _selectTablespace(AH, te->tablespace);
3738  if (te->relkind != RELKIND_PARTITIONED_TABLE)
3740 
3741  /* Emit header comment for item */
3742  if (!AH->noTocComments)
3743  {
3744  const char *pfx;
3745  char *sanitized_name;
3746  char *sanitized_schema;
3747  char *sanitized_owner;
3748 
3749  if (isData)
3750  pfx = "Data for ";
3751  else
3752  pfx = "";
3753 
3754  ahprintf(AH, "--\n");
3755  if (AH->public.verbose)
3756  {
3757  ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3758  te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3759  if (te->nDeps > 0)
3760  {
3761  int i;
3762 
3763  ahprintf(AH, "-- Dependencies:");
3764  for (i = 0; i < te->nDeps; i++)
3765  ahprintf(AH, " %d", te->dependencies[i]);
3766  ahprintf(AH, "\n");
3767  }
3768  }
3769 
3770  sanitized_name = sanitize_line(te->tag, false);
3771  sanitized_schema = sanitize_line(te->namespace, true);
3772  sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3773 
3774  ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3775  pfx, sanitized_name, te->desc, sanitized_schema,
3776  sanitized_owner);
3777 
3778  free(sanitized_name);
3779  free(sanitized_schema);
3780  free(sanitized_owner);
3781 
3782  if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3783  {
3784  char *sanitized_tablespace;
3785 
3786  sanitized_tablespace = sanitize_line(te->tablespace, false);
3787  ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3788  free(sanitized_tablespace);
3789  }
3790  ahprintf(AH, "\n");
3791 
3792  if (AH->PrintExtraTocPtr != NULL)
3793  AH->PrintExtraTocPtr(AH, te);
3794  ahprintf(AH, "--\n\n");
3795  }
3796 
3797  /*
3798  * Actually print the definition. Normally we can just print the defn
3799  * string if any, but we have three special cases:
3800  *
3801  * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3802  * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3803  * "public" that is a comment. We have to do this when --no-owner mode is
3804  * selected. This is ugly, but I see no other good way ...
3805  *
3806  * 2. BLOB METADATA entries need special processing since their defn
3807  * strings are just lists of OIDs, not complete SQL commands.
3808  *
3809  * 3. ACL LARGE OBJECTS entries need special processing because they
3810  * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3811  * apply to each large object listed in the associated BLOB METADATA.
3812  */
3813  if (ropt->noOwner &&
3814  strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3815  {
3816  ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3817  }
3818  else if (strcmp(te->desc, "BLOB METADATA") == 0)
3819  {
3820  IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3821  }
3822  else if (strcmp(te->desc, "ACL") == 0 &&
3823  strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3824  {
3825  IssueACLPerBlob(AH, te);
3826  }
3827  else
3828  {
3829  if (te->defn && strlen(te->defn) > 0)
3830  ahprintf(AH, "%s\n\n", te->defn);
3831  }
3832 
3833  /*
3834  * If we aren't using SET SESSION AUTH to determine ownership, we must
3835  * instead issue an ALTER OWNER command. Schema "public" is special; when
3836  * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3837  * when using SET SESSION for all other objects. We assume that anything
3838  * without a DROP command is not a separately ownable object.
3839  */
3840  if (!ropt->noOwner &&
3841  (!ropt->use_setsessauth ||
3842  (strcmp(te->desc, "SCHEMA") == 0 &&
3843  strncmp(te->defn, "--", 2) == 0)) &&
3844  te->owner && strlen(te->owner) > 0 &&
3845  te->dropStmt && strlen(te->dropStmt) > 0)
3846  {
3847  if (strcmp(te->desc, "BLOB METADATA") == 0)
3848  {
3849  /* BLOB METADATA needs special code to handle multiple LOs */
3850  char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3851 
3852  IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3853  pg_free(cmdEnd);
3854  }
3855  else
3856  {
3857  /* For all other cases, we can use _getObjectDescription */
3858  PQExpBufferData temp;
3859 
3860  initPQExpBuffer(&temp);
3861  _getObjectDescription(&temp, te);
3862 
3863  /*
3864  * If _getObjectDescription() didn't fill the buffer, then there
3865  * is no owner.
3866  */
3867  if (temp.data[0])
3868  ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3869  temp.data, fmtId(te->owner));
3870  termPQExpBuffer(&temp);
3871  }
3872  }
3873 
3874  /*
3875  * Select a partitioned table's default AM, once the table definition has
3876  * been generated.
3877  */
3878  if (te->relkind == RELKIND_PARTITIONED_TABLE)
3880 
3881  /*
3882  * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3883  * commands, so we can no longer assume we know the current auth setting.
3884  */
3885  if (_tocEntryIsACL(te))
3886  {
3887  free(AH->currUser);
3888  AH->currUser = NULL;
3889  }
3890 }
3891 
3892 /*
3893  * Sanitize a string to be included in an SQL comment or TOC listing, by
3894  * replacing any newlines with spaces. This ensures each logical output line
3895  * is in fact one physical output line, to prevent corruption of the dump
3896  * (which could, in the worst case, present an SQL injection vulnerability
3897  * if someone were to incautiously load a dump containing objects with
3898  * maliciously crafted names).
3899  *
3900  * The result is a freshly malloc'd string. If the input string is NULL,
3901  * return a malloc'ed empty string, unless want_hyphen, in which case return a
3902  * malloc'ed hyphen.
3903  *
3904  * Note that we currently don't bother to quote names, meaning that the name
3905  * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3906  * it only examines the dumpId field, but someday we might want to try harder.
3907  */
3908 static char *
3909 sanitize_line(const char *str, bool want_hyphen)
3910 {
3911  char *result;
3912  char *s;
3913 
3914  if (!str)
3915  return pg_strdup(want_hyphen ? "-" : "");
3916 
3917  result = pg_strdup(str);
3918 
3919  for (s = result; *s != '\0'; s++)
3920  {
3921  if (*s == '\n' || *s == '\r')
3922  *s = ' ';
3923  }
3924 
3925  return result;
3926 }
3927 
3928 /*
3929  * Write the file header for a custom-format archive
3930  */
3931 void
3933 {
3934  struct tm crtm;
3935 
3936  AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3937  AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3938  AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3939  AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3940  AH->WriteBytePtr(AH, AH->intSize);
3941  AH->WriteBytePtr(AH, AH->offSize);
3942  AH->WriteBytePtr(AH, AH->format);
3944  crtm = *localtime(&AH->createDate);
3945  WriteInt(AH, crtm.tm_sec);
3946  WriteInt(AH, crtm.tm_min);
3947  WriteInt(AH, crtm.tm_hour);
3948  WriteInt(AH, crtm.tm_mday);
3949  WriteInt(AH, crtm.tm_mon);
3950  WriteInt(AH, crtm.tm_year);
3951  WriteInt(AH, crtm.tm_isdst);
3952  WriteStr(AH, PQdb(AH->connection));
3953  WriteStr(AH, AH->public.remoteVersionStr);
3954  WriteStr(AH, PG_VERSION);
3955 }
3956 
3957 void
3959 {
3960  char *errmsg;
3961  char vmaj,
3962  vmin,
3963  vrev;
3964  int fmt;
3965 
3966  /*
3967  * If we haven't already read the header, do so.
3968  *
3969  * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3970  * way to unify the cases?
3971  */
3972  if (!AH->readHeader)
3973  {
3974  char tmpMag[7];
3975 
3976  AH->ReadBufPtr(AH, tmpMag, 5);
3977 
3978  if (strncmp(tmpMag, "PGDMP", 5) != 0)
3979  pg_fatal("did not find magic string in file header");
3980  }
3981 
3982  vmaj = AH->ReadBytePtr(AH);
3983  vmin = AH->ReadBytePtr(AH);
3984 
3985  if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3986  vrev = AH->ReadBytePtr(AH);
3987  else
3988  vrev = 0;
3989 
3990  AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3991 
3992  if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3993  pg_fatal("unsupported version (%d.%d) in file header",
3994  vmaj, vmin);
3995 
3996  AH->intSize = AH->ReadBytePtr(AH);
3997  if (AH->intSize > 32)
3998  pg_fatal("sanity check on integer size (%lu) failed",
3999  (unsigned long) AH->intSize);
4000 
4001  if (AH->intSize > sizeof(int))
4002  pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4003 
4004  if (AH->version >= K_VERS_1_7)
4005  AH->offSize = AH->ReadBytePtr(AH);
4006  else
4007  AH->offSize = AH->intSize;
4008 
4009  fmt = AH->ReadBytePtr(AH);
4010 
4011  if (AH->format != fmt)
4012  pg_fatal("expected format (%d) differs from format found in file (%d)",
4013  AH->format, fmt);
4014 
4015  if (AH->version >= K_VERS_1_15)
4016  AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4017  else if (AH->version >= K_VERS_1_2)
4018  {
4019  /* Guess the compression method based on the level */
4020  if (AH->version < K_VERS_1_4)
4021  AH->compression_spec.level = AH->ReadBytePtr(AH);
4022  else
4023  AH->compression_spec.level = ReadInt(AH);
4024 
4025  if (AH->compression_spec.level != 0)
4027  }
4028  else
4030 
4032  if (errmsg)
4033  {
4034  pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4035  errmsg);
4036  pg_free(errmsg);
4037  }
4038 
4039  if (AH->version >= K_VERS_1_4)
4040  {
4041  struct tm crtm;
4042 
4043  crtm.tm_sec = ReadInt(AH);
4044  crtm.tm_min = ReadInt(AH);
4045  crtm.tm_hour = ReadInt(AH);
4046  crtm.tm_mday = ReadInt(AH);
4047  crtm.tm_mon = ReadInt(AH);
4048  crtm.tm_year = ReadInt(AH);
4049  crtm.tm_isdst = ReadInt(AH);
4050 
4051  /*
4052  * Newer versions of glibc have mktime() report failure if tm_isdst is
4053  * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4054  * TZ=UTC. This is problematic when restoring an archive under a
4055  * different timezone setting. If we get a failure, try again with
4056  * tm_isdst set to -1 ("don't know").
4057  *
4058  * XXX with or without this hack, we reconstruct createDate
4059  * incorrectly when the prevailing timezone is different from
4060  * pg_dump's. Next time we bump the archive version, we should flush
4061  * this representation and store a plain seconds-since-the-Epoch
4062  * timestamp instead.
4063  */
4064  AH->createDate = mktime(&crtm);
4065  if (AH->createDate == (time_t) -1)
4066  {
4067  crtm.tm_isdst = -1;
4068  AH->createDate = mktime(&crtm);
4069  if (AH->createDate == (time_t) -1)
4070  pg_log_warning("invalid creation date in header");
4071  }
4072  }
4073 
4074  if (AH->version >= K_VERS_1_4)
4075  {
4076  AH->archdbname = ReadStr(AH);
4077  }
4078 
4079  if (AH->version >= K_VERS_1_10)
4080  {
4081  AH->archiveRemoteVersion = ReadStr(AH);
4082  AH->archiveDumpVersion = ReadStr(AH);
4083  }
4084 }
4085 
4086 
4087 /*
4088  * checkSeek
4089  * check to see if ftell/fseek can be performed.
4090  */
4091 bool
4092 checkSeek(FILE *fp)
4093 {
4094  pgoff_t tpos;
4095 
4096  /* Check that ftello works on this file */
4097  tpos = ftello(fp);
4098  if (tpos < 0)
4099  return false;
4100 
4101  /*
4102  * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4103  * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4104  * successful no-op even on files that are otherwise unseekable.
4105  */
4106  if (fseeko(fp, tpos, SEEK_SET) != 0)
4107  return false;
4108 
4109  return true;
4110 }
4111 
4112 
4113 /*
4114  * dumpTimestamp
4115  */
4116 static void
4117 dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4118 {
4119  char buf[64];
4120 
4121  if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4122  ahprintf(AH, "-- %s %s\n\n", msg, buf);
4123 }
4124 
4125 /*
4126  * Main engine for parallel restore.
4127  *
4128  * Parallel restore is done in three phases. In this first phase,
4129  * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4130  * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4131  * PRE_DATA items other than ACLs.) Entries we can't process now are
4132  * added to the pending_list for later phases to deal with.
4133  */
4134 static void
4136 {
4137  bool skipped_some;
4138  TocEntry *next_work_item;
4139 
4140  pg_log_debug("entering restore_toc_entries_prefork");
4141 
4142  /* Adjust dependency information */
4143  fix_dependencies(AH);
4144 
4145  /*
4146  * Do all the early stuff in a single connection in the parent. There's no
4147  * great point in running it in parallel, in fact it will actually run
4148  * faster in a single connection because we avoid all the connection and
4149  * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4150  * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4151  * not risk trying to process them out-of-order.
4152  *
4153  * Stuff that we can't do immediately gets added to the pending_list.
4154  * Note: we don't yet filter out entries that aren't going to be restored.
4155  * They might participate in dependency chains connecting entries that
4156  * should be restored, so we treat them as live until we actually process
4157  * them.
4158  *
4159  * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4160  * before DATA items, and all DATA items before POST_DATA items. That is
4161  * not certain to be true in older archives, though, and in any case use
4162  * of a list file would destroy that ordering (cf. SortTocFromFile). So
4163  * this loop cannot assume that it holds.
4164  */
4166  skipped_some = false;
4167  for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4168  {
4169  bool do_now = true;
4170 
4171  if (next_work_item->section != SECTION_PRE_DATA)
4172  {
4173  /* DATA and POST_DATA items are just ignored for now */
4174  if (next_work_item->section == SECTION_DATA ||
4175  next_work_item->section == SECTION_POST_DATA)
4176  {
4177  do_now = false;
4178  skipped_some = true;
4179  }
4180  else
4181  {
4182  /*
4183  * SECTION_NONE items, such as comments, can be processed now
4184  * if we are still in the PRE_DATA part of the archive. Once
4185  * we've skipped any items, we have to consider whether the
4186  * comment's dependencies are satisfied, so skip it for now.
4187  */
4188  if (skipped_some)
4189  do_now = false;
4190  }
4191  }
4192 
4193  /*
4194  * Also skip items that need to be forced into later passes. We need
4195  * not set skipped_some in this case, since by assumption no main-pass
4196  * items could depend on these.
4197  */
4198  if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4199  do_now = false;
4200 
4201  if (do_now)
4202  {
4203  /* OK, restore the item and update its dependencies */
4204  pg_log_info("processing item %d %s %s",
4205  next_work_item->dumpId,
4206  next_work_item->desc, next_work_item->tag);
4207 
4208  (void) restore_toc_entry(AH, next_work_item, false);
4209 
4210  /* Reduce dependencies, but don't move anything to ready_heap */
4211  reduce_dependencies(AH, next_work_item, NULL);
4212  }
4213  else
4214  {
4215  /* Nope, so add it to pending_list */
4216  pending_list_append(pending_list, next_work_item);
4217  }
4218  }
4219 
4220  /*
4221  * In --transaction-size mode, we must commit the open transaction before
4222  * dropping the database connection. This also ensures that child workers
4223  * can see the objects we've created so far.
4224  */
4225  if (AH->public.ropt->txn_size > 0)
4226  CommitTransaction(&AH->public);
4227 
4228  /*
4229  * Now close parent connection in prep for parallel steps. We do this
4230  * mainly to ensure that we don't exceed the specified number of parallel
4231  * connections.
4232  */
4233  DisconnectDatabase(&AH->public);
4234 
4235  /* blow away any transient state from the old connection */
4236  free(AH->currUser);
4237  AH->currUser = NULL;
4238  free(AH->currSchema);
4239  AH->currSchema = NULL;
4240  free(AH->currTablespace);
4241  AH->currTablespace = NULL;
4242  free(AH->currTableAm);
4243  AH->currTableAm = NULL;
4244 }
4245 
4246 /*
4247  * Main engine for parallel restore.
4248  *
4249  * Parallel restore is done in three phases. In this second phase,
4250  * we process entries by dispatching them to parallel worker children
4251  * (processes on Unix, threads on Windows), each of which connects
4252  * separately to the database. Inter-entry dependencies are respected,
4253  * and so is the RestorePass multi-pass structure. When we can no longer
4254  * make any entries ready to process, we exit. Normally, there will be
4255  * nothing left to do; but if there is, the third phase will mop up.
4256  */
4257 static void
4259  TocEntry *pending_list)
4260 {
4261  binaryheap *ready_heap;
4262  TocEntry *next_work_item;
4263 
4264  pg_log_debug("entering restore_toc_entries_parallel");
4265 
4266  /* Set up ready_heap with enough room for all known TocEntrys */
4267  ready_heap = binaryheap_allocate(AH->tocCount,
4269  NULL);
4270 
4271  /*
4272  * The pending_list contains all items that we need to restore. Move all
4273  * items that are available to process immediately into the ready_heap.
4274  * After this setup, the pending list is everything that needs to be done
4275  * but is blocked by one or more dependencies, while the ready heap
4276  * contains items that have no remaining dependencies and are OK to
4277  * process in the current restore pass.
4278  */
4280  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4281 
4282  /*
4283  * main parent loop
4284  *
4285  * Keep going until there is no worker still running AND there is no work
4286  * left to be done. Note invariant: at top of loop, there should always
4287  * be at least one worker available to dispatch a job to.
4288  */
4289  pg_log_info("entering main parallel loop");
4290 
4291  for (;;)
4292  {
4293  /* Look for an item ready to be dispatched to a worker */
4294  next_work_item = pop_next_work_item(ready_heap, pstate);
4295  if (next_work_item != NULL)
4296  {
4297  /* If not to be restored, don't waste time launching a worker */
4298  if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4299  {
4300  pg_log_info("skipping item %d %s %s",
4301  next_work_item->dumpId,
4302  next_work_item->desc, next_work_item->tag);
4303  /* Update its dependencies as though we'd completed it */
4304  reduce_dependencies(AH, next_work_item, ready_heap);
4305  /* Loop around to see if anything else can be dispatched */
4306  continue;
4307  }
4308 
4309  pg_log_info("launching item %d %s %s",
4310  next_work_item->dumpId,
4311  next_work_item->desc, next_work_item->tag);
4312 
4313  /* Dispatch to some worker */
4314  DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4315  mark_restore_job_done, ready_heap);
4316  }
4317  else if (IsEveryWorkerIdle(pstate))
4318  {
4319  /*
4320  * Nothing is ready and no worker is running, so we're done with
4321  * the current pass or maybe with the whole process.
4322  */
4323  if (AH->restorePass == RESTORE_PASS_LAST)
4324  break; /* No more parallel processing is possible */
4325 
4326  /* Advance to next restore pass */
4327  AH->restorePass++;
4328  /* That probably allows some stuff to be made ready */
4329  move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4330  /* Loop around to see if anything's now ready */
4331  continue;
4332  }
4333  else
4334  {
4335  /*
4336  * We have nothing ready, but at least one child is working, so
4337  * wait for some subjob to finish.
4338  */
4339  }
4340 
4341  /*
4342  * Before dispatching another job, check to see if anything has
4343  * finished. We should check every time through the loop so as to
4344  * reduce dependencies as soon as possible. If we were unable to
4345  * dispatch any job this time through, wait until some worker finishes
4346  * (and, hopefully, unblocks some pending item). If we did dispatch
4347  * something, continue as soon as there's at least one idle worker.
4348  * Note that in either case, there's guaranteed to be at least one
4349  * idle worker when we return to the top of the loop. This ensures we
4350  * won't block inside DispatchJobForTocEntry, which would be
4351  * undesirable: we'd rather postpone dispatching until we see what's
4352  * been unblocked by finished jobs.
4353  */
4354  WaitForWorkers(AH, pstate,
4355  next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4356  }
4357 
4358  /* There should now be nothing in ready_heap. */
4359  Assert(binaryheap_empty(ready_heap));
4360 
4361  binaryheap_free(ready_heap);
4362 
4363  pg_log_info("finished main parallel loop");
4364 }
4365 
4366 /*
4367  * Main engine for parallel restore.
4368  *
4369  * Parallel restore is done in three phases. In this third phase,
4370  * we mop up any remaining TOC entries by processing them serially.
4371  * This phase normally should have nothing to do, but if we've somehow
4372  * gotten stuck due to circular dependencies or some such, this provides
4373  * at least some chance of completing the restore successfully.
4374  */
4375 static void
4377 {
4378  RestoreOptions *ropt = AH->public.ropt;
4379  TocEntry *te;
4380 
4381  pg_log_debug("entering restore_toc_entries_postfork");
4382 
4383  /*
4384  * Now reconnect the single parent connection.
4385  */
4386  ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4387 
4388  /* re-establish fixed state */
4390 
4391  /*
4392  * Make sure there is no work left due to, say, circular dependencies, or
4393  * some other pathological condition. If so, do it in the single parent
4394  * connection. We don't sweat about RestorePass ordering; it's likely we
4395  * already violated that.
4396  */
4397  for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4398  {
4399  pg_log_info("processing missed item %d %s %s",
4400  te->dumpId, te->desc, te->tag);
4401  (void) restore_toc_entry(AH, te, false);
4402  }
4403 }
4404 
4405 /*
4406  * Check if te1 has an exclusive lock requirement for an item that te2 also
4407  * requires, whether or not te2's requirement is for an exclusive lock.
4408  */
4409 static bool
4411 {
4412  int j,
4413  k;
4414 
4415  for (j = 0; j < te1->nLockDeps; j++)
4416  {
4417  for (k = 0; k < te2->nDeps; k++)
4418  {
4419  if (te1->lockDeps[j] == te2->dependencies[k])
4420  return true;
4421  }
4422  }
4423  return false;
4424 }
4425 
4426 
4427 /*
4428  * Initialize the header of the pending-items list.
4429  *
4430  * This is a circular list with a dummy TocEntry as header, just like the
4431  * main TOC list; but we use separate list links so that an entry can be in
4432  * the main TOC list as well as in the pending list.
4433  */
4434 static void
4436 {
4437  l->pending_prev = l->pending_next = l;
4438 }
4439 
4440 /* Append te to the end of the pending-list headed by l */
4441 static void
4443 {
4444  te->pending_prev = l->pending_prev;
4445  l->pending_prev->pending_next = te;
4446  l->pending_prev = te;
4447  te->pending_next = l;
4448 }
4449 
4450 /* Remove te from the pending-list */
4451 static void
4453 {
4456  te->pending_prev = NULL;
4457  te->pending_next = NULL;
4458 }
4459 
4460 
4461 /* qsort comparator for sorting TocEntries by dataLength */
4462 static int
4463 TocEntrySizeCompareQsort(const void *p1, const void *p2)
4464 {
4465  const TocEntry *te1 = *(const TocEntry *const *) p1;
4466  const TocEntry *te2 = *(const TocEntry *const *) p2;
4467 
4468  /* Sort by decreasing dataLength */
4469  if (te1->dataLength > te2->dataLength)
4470  return -1;
4471  if (te1->dataLength < te2->dataLength)
4472  return 1;
4473 
4474  /* For equal dataLengths, sort by dumpId, just to be stable */
4475  if (te1->dumpId < te2->dumpId)
4476  return -1;
4477  if (te1->dumpId > te2->dumpId)
4478  return 1;
4479 
4480  return 0;
4481 }
4482 
4483 /* binaryheap comparator for sorting TocEntries by dataLength */
4484 static int
4485 TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4486 {
4487  /* return opposite of qsort comparator for max-heap */
4488  return -TocEntrySizeCompareQsort(&p1, &p2);
4489 }
4490 
4491 
4492 /*
4493  * Move all immediately-ready items from pending_list to ready_heap.
4494  *
4495  * Items are considered ready if they have no remaining dependencies and
4496  * they belong in the current restore pass. (See also reduce_dependencies,
4497  * which applies the same logic one-at-a-time.)
4498  */
4499 static void
4501  binaryheap *ready_heap,
4502  RestorePass pass)
4503 {
4504  TocEntry *te;
4505  TocEntry *next_te;
4506 
4507  for (te = pending_list->pending_next; te != pending_list; te = next_te)
4508  {
4509  /* must save list link before possibly removing te from list */
4510  next_te = te->pending_next;
4511 
4512  if (te->depCount == 0 &&
4513  _tocEntryRestorePass(te) == pass)
4514  {
4515  /* Remove it from pending_list ... */
4516  pending_list_remove(te);
4517  /* ... and add to ready_heap */
4518  binaryheap_add(ready_heap, te);
4519  }
4520  }
4521 }
4522 
4523 /*
4524  * Find the next work item (if any) that is capable of being run now,
4525  * and remove it from the ready_heap.
4526  *
4527  * Returns the item, or NULL if nothing is runnable.
4528  *
4529  * To qualify, the item must have no remaining dependencies
4530  * and no requirements for locks that are incompatible with
4531  * items currently running. Items in the ready_heap are known to have
4532  * no remaining dependencies, but we have to check for lock conflicts.
4533  */
4534 static TocEntry *
4536  ParallelState *pstate)
4537 {
4538  /*
4539  * Search the ready_heap until we find a suitable item. Note that we do a
4540  * sequential scan through the heap nodes, so even though we will first
4541  * try to choose the highest-priority item, we might end up picking
4542  * something with a much lower priority. However, we expect that we will
4543  * typically be able to pick one of the first few items, which should
4544  * usually have a relatively high priority.
4545  */
4546  for (int i = 0; i < binaryheap_size(ready_heap); i++)
4547  {
4548  TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4549  bool conflicts = false;
4550 
4551  /*
4552  * Check to see if the item would need exclusive lock on something
4553  * that a currently running item also needs lock on, or vice versa. If
4554  * so, we don't want to schedule them together.
4555  */
4556  for (int k = 0; k < pstate->numWorkers; k++)
4557  {
4558  TocEntry *running_te = pstate->te[k];
4559 
4560  if (running_te == NULL)
4561  continue;
4562  if (has_lock_conflicts(te, running_te) ||
4563  has_lock_conflicts(running_te, te))
4564  {
4565  conflicts = true;
4566  break;
4567  }
4568  }
4569 
4570  if (conflicts)
4571  continue;
4572 
4573  /* passed all tests, so this item can run */
4574  binaryheap_remove_node(ready_heap, i);
4575  return te;
4576  }
4577 
4578  pg_log_debug("no item ready");
4579  return NULL;
4580 }
4581 
4582 
4583 /*
4584  * Restore a single TOC item in parallel with others
4585  *
4586  * this is run in the worker, i.e. in a thread (Windows) or a separate process
4587  * (everything else). A worker process executes several such work items during
4588  * a parallel backup or restore. Once we terminate here and report back that
4589  * our work is finished, the leader process will assign us a new work item.
4590  */
4591 int
4593 {
4594  int status;
4595 
4596  Assert(AH->connection != NULL);
4597 
4598  /* Count only errors associated with this TOC entry */
4599  AH->public.n_errors = 0;
4600 
4601  /* Restore the TOC item */
4602  status = restore_toc_entry(AH, te, true);
4603 
4604  return status;
4605 }
4606 
4607 
4608 /*
4609  * Callback function that's invoked in the leader process after a step has
4610  * been parallel restored.
4611  *
4612  * Update status and reduce the dependency count of any dependent items.
4613  */
4614 static void
4616  TocEntry *te,
4617  int status,
4618  void *callback_data)
4619 {
4620  binaryheap *ready_heap = (binaryheap *) callback_data;
4621 
4622  pg_log_info("finished item %d %s %s",
4623  te->dumpId, te->desc, te->tag);
4624 
4625  if (status == WORKER_CREATE_DONE)
4626  mark_create_done(AH, te);
4627  else if (status == WORKER_INHIBIT_DATA)
4628  {
4630  AH->public.n_errors++;
4631  }
4632  else if (status == WORKER_IGNORED_ERRORS)
4633  AH->public.n_errors++;
4634  else if (status != 0)
4635  pg_fatal("worker process failed: exit code %d",
4636  status);
4637 
4638  reduce_dependencies(AH, te, ready_heap);
4639 }
4640 
4641 
4642 /*
4643  * Process the dependency information into a form useful for parallel restore.
4644  *
4645  * This function takes care of fixing up some missing or badly designed
4646  * dependencies, and then prepares subsidiary data structures that will be
4647  * used in the main parallel-restore logic, including:
4648  * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4649  * 2. We set up depCount fields that are the number of as-yet-unprocessed
4650  * dependencies for each TOC entry.
4651  *
4652  * We also identify locking dependencies so that we can avoid trying to
4653  * schedule conflicting items at the same time.
4654  */
4655 static void
4657 {
4658  TocEntry *te;
4659  int i;
4660 
4661  /*
4662  * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4663  * items are marked as not being in any parallel-processing list.
4664  */
4665  for (te = AH->toc->next; te != AH->toc; te = te->next)
4666  {
4667  te->depCount = te->nDeps;
4668  te->revDeps = NULL;
4669  te->nRevDeps = 0;
4670  te->pending_prev = NULL;
4671  te->pending_next = NULL;
4672  }
4673 
4674  /*
4675  * POST_DATA items that are shown as depending on a table need to be
4676  * re-pointed to depend on that table's data, instead. This ensures they
4677  * won't get scheduled until the data has been loaded.
4678  */
4680 
4681  /*
4682  * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4683  * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4684  * one BLOB COMMENTS in such files.)
4685  */
4686  if (AH->version < K_VERS_1_11)
4687  {
4688  for (te = AH->toc->next; te != AH->toc; te = te->next)
4689  {
4690  if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4691  {
4692  TocEntry *te2;
4693 
4694  for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4695  {
4696  if (strcmp(te2->desc, "BLOBS") == 0)
4697  {
4698  te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4699  te->dependencies[0] = te2->dumpId;
4700  te->nDeps++;
4701  te->depCount++;
4702  break;
4703  }
4704  }
4705  break;
4706  }
4707  }
4708  }
4709 
4710  /*
4711  * At this point we start to build the revDeps reverse-dependency arrays,
4712  * so all changes of dependencies must be complete.
4713  */
4714 
4715  /*
4716  * Count the incoming dependencies for each item. Also, it is possible
4717  * that the dependencies list items that are not in the archive at all
4718  * (that should not happen in 9.2 and later, but is highly likely in older
4719  * archives). Subtract such items from the depCounts.
4720  */
4721  for (te = AH->toc->next; te != AH->toc; te = te->next)
4722  {
4723  for (i = 0; i < te->nDeps; i++)
4724  {
4725  DumpId depid = te->dependencies[i];
4726 
4727  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4728  AH->tocsByDumpId[depid]->nRevDeps++;
4729  else
4730  te->depCount--;
4731  }
4732  }
4733 
4734  /*
4735  * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4736  * it as a counter below.
4737  */
4738  for (te = AH->toc->next; te != AH->toc; te = te->next)
4739  {
4740  if (te->nRevDeps > 0)
4741  te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4742  te->nRevDeps = 0;
4743  }
4744 
4745  /*
4746  * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4747  * better agree with the loops above.
4748  */
4749  for (te = AH->toc->next; te != AH->toc; te = te->next)
4750  {
4751  for (i = 0; i < te->nDeps; i++)
4752  {
4753  DumpId depid = te->dependencies[i];
4754 
4755  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4756  {
4757  TocEntry *otherte = AH->tocsByDumpId[depid];
4758 
4759  otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4760  }
4761  }
4762  }
4763 
4764  /*
4765  * Lastly, work out the locking dependencies.
4766  */
4767  for (te = AH->toc->next; te != AH->toc; te = te->next)
4768  {
4769  te->lockDeps = NULL;
4770  te->nLockDeps = 0;
4772  }
4773 }
4774 
4775 /*
4776  * Change dependencies on table items to depend on table data items instead,
4777  * but only in POST_DATA items.
4778  *
4779  * Also, for any item having such dependency(s), set its dataLength to the
4780  * largest dataLength of the table data items it depends on. This ensures
4781  * that parallel restore will prioritize larger jobs (index builds, FK
4782  * constraint checks, etc) over smaller ones, avoiding situations where we
4783  * end a restore with only one active job working on a large table.
4784  */
4785 static void
4787 {
4788  TocEntry *te;
4789  int i;
4790  DumpId olddep;
4791 
4792  for (te = AH->toc->next; te != AH->toc; te = te->next)
4793  {
4794  if (te->section != SECTION_POST_DATA)
4795  continue;
4796  for (i = 0; i < te->nDeps; i++)
4797  {
4798  olddep = te->dependencies[i];
4799  if (olddep <= AH->maxDumpId &&
4800  AH->tableDataId[olddep] != 0)
4801  {
4802  DumpId tabledataid = AH->tableDataId[olddep];
4803  TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4804 
4805  te->dependencies[i] = tabledataid;
4806  te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4807  pg_log_debug("transferring dependency %d -> %d to %d",
4808  te->dumpId, olddep, tabledataid);
4809  }
4810  }
4811  }
4812 }
4813 
4814 /*
4815  * Identify which objects we'll need exclusive lock on in order to restore
4816  * the given TOC entry (*other* than the one identified by the TOC entry
4817  * itself). Record their dump IDs in the entry's lockDeps[] array.
4818  */
4819 static void
4821 {
4822  DumpId *lockids;
4823  int nlockids;
4824  int i;
4825 
4826  /*
4827  * We only care about this for POST_DATA items. PRE_DATA items are not
4828  * run in parallel, and DATA items are all independent by assumption.
4829  */
4830  if (te->section != SECTION_POST_DATA)
4831  return;
4832 
4833  /* Quick exit if no dependencies at all */
4834  if (te->nDeps == 0)
4835  return;
4836 
4837  /*
4838  * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4839  * and hence require exclusive lock. However, we know that CREATE INDEX
4840  * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4841  * category too ... but today is not that day.)
4842  */
4843  if (strcmp(te->desc, "INDEX") == 0)
4844  return;
4845 
4846  /*
4847  * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4848  * item listed among its dependencies. Originally all of these would have
4849  * been TABLE items, but repoint_table_dependencies would have repointed
4850  * them to the TABLE DATA items if those are present (which they might not
4851  * be, eg in a schema-only dump). Note that all of the entries we are
4852  * processing here are POST_DATA; otherwise there might be a significant
4853  * difference between a dependency on a table and a dependency on its
4854  * data, so that closer analysis would be needed here.
4855  */
4856  lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4857  nlockids = 0;
4858  for (i = 0; i < te->nDeps; i++)
4859  {
4860  DumpId depid = te->dependencies[i];
4861 
4862  if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4863  ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4864  strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4865  lockids[nlockids++] = depid;
4866  }
4867 
4868  if (nlockids == 0)
4869  {
4870  free(lockids);
4871  return;
4872  }
4873 
4874  te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4875  te->nLockDeps = nlockids;
4876 }
4877 
4878 /*
4879  * Remove the specified TOC entry from the depCounts of items that depend on
4880  * it, thereby possibly making them ready-to-run. Any pending item that
4881  * becomes ready should be moved to the ready_heap, if that's provided.
4882  */
4883 static void
4885  binaryheap *ready_heap)
4886 {
4887  int i;
4888 
4889  pg_log_debug("reducing dependencies for %d", te->dumpId);
4890 
4891  for (i = 0; i < te->nRevDeps; i++)
4892  {
4893  TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4894 
4895  Assert(otherte->depCount > 0);
4896  otherte->depCount--;
4897 
4898  /*
4899  * It's ready if it has no remaining dependencies, and it belongs in
4900  * the current restore pass, and it is currently a member of the
4901  * pending list (that check is needed to prevent double restore in
4902  * some cases where a list-file forces out-of-order restoring).
4903  * However, if ready_heap == NULL then caller doesn't want any list
4904  * memberships changed.
4905  */
4906  if (otherte->depCount == 0 &&
4907  _tocEntryRestorePass(otherte) == AH->restorePass &&
4908  otherte->pending_prev != NULL &&
4909  ready_heap != NULL)
4910  {
4911  /* Remove it from pending list ... */
4912  pending_list_remove(otherte);
4913  /* ... and add to ready_heap */
4914  binaryheap_add(ready_heap, otherte);
4915  }
4916  }
4917 }
4918 
4919 /*
4920  * Set the created flag on the DATA member corresponding to the given
4921  * TABLE member
4922  */
4923 static void
4925 {
4926  if (AH->tableDataId[te->dumpId] != 0)
4927  {
4928  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4929 
4930  ted->created = true;
4931  }
4932 }
4933 
4934 /*
4935  * Mark the DATA member corresponding to the given TABLE member
4936  * as not wanted
4937  */
4938 static void
4940 {
4941  pg_log_info("table \"%s\" could not be created, will not restore its data",
4942  te->tag);
4943 
4944  if (AH->tableDataId[te->dumpId] != 0)
4945  {
4946  TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4947 
4948  ted->reqs = 0;
4949  }
4950 }
4951 
4952 /*
4953  * Clone and de-clone routines used in parallel restoration.
4954  *
4955  * Enough of the structure is cloned to ensure that there is no
4956  * conflict between different threads each with their own clone.
4957  */
4958 ArchiveHandle *
4960 {
4961  ArchiveHandle *clone;
4962 
4963  /* Make a "flat" copy */
4964  clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4965  memcpy(clone, AH, sizeof(ArchiveHandle));
4966 
4967  /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
4968  clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
4969  memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
4970 
4971  /* Handle format-independent fields */
4972  memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4973 
4974  /* The clone will have its own connection, so disregard connection state */
4975  clone->connection = NULL;
4976  clone->connCancel = NULL;
4977  clone->currUser = NULL;
4978  clone->currSchema = NULL;
4979  clone->currTableAm = NULL;
4980  clone->currTablespace = NULL;
4981 
4982  /* savedPassword must be local in case we change it while connecting */
4983  if (clone->savedPassword)
4984  clone->savedPassword = pg_strdup(clone->savedPassword);
4985 
4986  /* clone has its own error count, too */
4987  clone->public.n_errors = 0;
4988 
4989  /* clones should not share lo_buf */
4990  clone->lo_buf = NULL;
4991 
4992  /*
4993  * Clone connections disregard --transaction-size; they must commit after
4994  * each command so that the results are immediately visible to other
4995  * workers.
4996  */
4997  clone->public.ropt->txn_size = 0;
4998 
4999  /*
5000  * Connect our new clone object to the database, using the same connection
5001  * parameters used for the original connection.
5002  */
5003  ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5004 
5005  /* re-establish fixed state */
5006  if (AH->mode == archModeRead)
5007  _doSetFixedOutputState(clone);
5008  /* in write case, setupDumpWorker will fix up connection state */
5009 
5010  /* Let the format-specific code have a chance too */
5011  clone->ClonePtr(clone);
5012 
5013  Assert(clone->connection != NULL);
5014  return clone;
5015 }
5016 
5017 /*
5018  * Release clone-local storage.
5019  *
5020  * Note: we assume any clone-local connection was already closed.
5021  */
5022 void
5024 {
5025  /* Should not have an open database connection */
5026  Assert(AH->connection == NULL);
5027 
5028  /* Clear format-specific state */
5029  AH->DeClonePtr(AH);
5030 
5031  /* Clear state allocated by CloneArchive */
5032  if (AH->sqlparse.curCmd)
5034 
5035  /* Clear any connection-local state */
5036  free(AH->currUser);
5037  free(AH->currSchema);
5038  free(AH->currTablespace);
5039  free(AH->currTableAm);
5040  free(AH->savedPassword);
5041 
5042  free(AH);
5043 }
int lo_write(int fd, const char *buf, int len)
Definition: be-fsstubs.c:182
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1059
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1451
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1205
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1268
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:897
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_ONE_IDLE
Definition: parallel.h:34
void binaryheap_remove_node(binaryheap *heap, int n)
Definition: binaryheap.c:225
void binaryheap_add(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:154
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
#define binaryheap_size(h)
Definition: binaryheap.h:66
#define binaryheap_empty(h)
Definition: binaryheap.h:65
#define binaryheap_get_node(h, n)
Definition: binaryheap.h:67
#define PG_BINARY_R
Definition: c.h:1275
#define ngettext(s, p, n)
Definition: c.h:1181
#define PG_BINARY_A
Definition: c.h:1274
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY_W
Definition: c.h:1276
bool EndCompressFileHandle(CompressFileHandle *CFH)
Definition: compress_io.c:289
char * supports_compression(const pg_compress_specification compression_spec)
Definition: compress_io.c:88
CompressFileHandle * InitCompressFileHandle(const pg_compress_specification compression_spec)
Definition: compress_io.c:195
const char * get_compress_algorithm_name(pg_compress_algorithm algorithm)
Definition: compression.c:69
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define PGDUMP_STRFTIME_FMT
Definition: dumputils.h:33
int errmsg(const char *fmt,...)
Definition: elog.c:1072
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6993
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int lo_close(PGconn *conn, int fd)
Definition: fe-lobj.c:96
int lo_open(PGconn *conn, Oid lobjId, int mode)
Definition: fe-lobj.c:57
Oid lo_create(PGconn *conn, Oid lobjId)
Definition: fe-lobj.c:474
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
DataDirSyncMethod
Definition: file_utils.h:28
@ DATA_DIR_SYNC_METHOD_FSYNC
Definition: file_utils.h:29
const char * str
#define free(a)
Definition: header.h:65
int remaining
Definition: informix.c:673
char sign
Definition: informix.c:674
static DataDirSyncMethod sync_method
Definition: initdb.c:170
int b
Definition: isn.c:70
return true
Definition: isn.c:126
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
#define INV_WRITE
Definition: libpq-fs.h:21
static void const char * fmt
va_end(args)
va_start(args, fmt)
static struct pg_tm tm
Definition: localtime.c:104
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition: logging.c:216
#define pg_log_info(...)
Definition: logging.h:124
@ PG_LOG_PRIMARY
Definition: logging.h:67
@ PG_LOG_ERROR
Definition: logging.h:43
#define pg_log_debug(...)
Definition: logging.h:133
static AmcheckOptions opts
Definition: pg_amcheck.c:111
void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect)
Definition: pg_backup_db.c:110
@ SECTION_NONE
Definition: pg_backup.h:57
@ SECTION_POST_DATA
Definition: pg_backup.h:60
@ SECTION_PRE_DATA
Definition: pg_backup.h:58
@ SECTION_DATA
Definition: pg_backup.h:59
int DumpId
Definition: pg_backup.h:270
void(* SetupWorkerPtrType)(Archive *AH)
Definition: pg_backup.h:280
enum _archiveFormat ArchiveFormat
@ archModeWrite
Definition: pg_backup.h:51
@ archModeAppend
Definition: pg_backup.h:50
@ archModeRead
Definition: pg_backup.h:52
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:225
enum _teSection teSection
@ archUnknown
Definition: pg_backup.h:41
@ archTar
Definition: pg_backup.h:43
@ archCustom
Definition: pg_backup.h:42
@ archDirectory
Definition: pg_backup.h:45
@ archNull
Definition: pg_backup.h:44
static void fix_dependencies(ArchiveHandle *AH)
static void repoint_table_dependencies(ArchiveHandle *AH)
void DeCloneArchive(ArchiveHandle *AH)
static int _discoverArchiveFormat(ArchiveHandle *AH)
#define TEXT_DUMPALL_HEADER
int TocIDRequired(ArchiveHandle *AH, DumpId id)
bool checkSeek(FILE *fp)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te)
void WriteHead(ArchiveHandle *AH)
int EndLO(Archive *AHX, Oid oid)
static CompressFileHandle * SaveOutput(ArchiveHandle *AH)
static void _becomeUser(ArchiveHandle *AH, const char *user)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static void pending_list_append(TocEntry *l, TocEntry *te)
size_t WriteInt(ArchiveHandle *AH, int i)
void ProcessArchiveRestoreOptions(Archive *AHX)
static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
static void _moveBefore(TocEntry *pos, TocEntry *te)
static bool _tocEntryIsACL(TocEntry *te)
static void move_to_ready_heap(TocEntry *pending_list, binaryheap *ready_heap, RestorePass pass)
static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
static void buildTocEntryArrays(ArchiveHandle *AH)
static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
int archprintf(Archive *AH, const char *fmt,...)
static void StrictNamesCheck(RestoreOptions *ropt)
static void mark_restore_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
size_t WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
RestoreOptions * NewRestoreOptions(void)
static RestorePass _tocEntryRestorePass(TocEntry *te)
int StartLO(Archive *AHX, Oid oid)
static void setupRestoreWorker(Archive *AHX)
Archive * CreateArchive(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupDumpWorker, DataDirSyncMethod sync_method)
static void _reconnectToDB(ArchiveHandle *AH, const char *dbname)
static int TocEntrySizeCompareQsort(const void *p1, const void *p2)
void StartRestoreLOs(ArchiveHandle *AH)
void CloseArchive(Archive *AHX)
static void pending_list_header_init(TocEntry *l)
static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list)
static void SetOutput(ArchiveHandle *AH, const char *filename, const pg_compress_specification compression_spec)
static void mark_create_done(ArchiveHandle *AH, TocEntry *te)
#define TEXT_DUMP_HEADER
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
void WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
int ahprintf(ArchiveHandle *AH, const char *fmt,...)
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static bool is_load_via_partition_root(TocEntry *te)
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
void SortTocFromFile(Archive *AHX)
int ReadOffset(ArchiveHandle *AH, pgoff_t *o)
char * ReadStr(ArchiveHandle *AH)
static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
int ReadInt(ArchiveHandle *AH)
static void restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
static void _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ArchiveOpts *opts)
static void _doSetFixedOutputState(ArchiveHandle *AH)
void PrintTOCSummary(Archive *AHX)
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
static int RestoringToDB(ArchiveHandle *AH)
void ReadHead(ArchiveHandle *AH)
void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
static void pending_list_remove(TocEntry *te)
static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2)
DumpOptions * NewDumpOptions(void)
void ReadToc(ArchiveHandle *AH)
static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, binaryheap *ready_heap)
static char * sanitize_line(const char *str, bool want_hyphen)
void EndRestoreLO(ArchiveHandle *AH, Oid oid)
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace)
void RestoreArchive(Archive *AHX)
void WriteToc(ArchiveHandle *AH)
void archputs(const char *s, Archive *AH)
static bool _fileExistsInDirectory(const char *dir, const char *filename)
static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
static void _doSetSessionAuth(ArchiveHandle *AH, const char *user)
void EndRestoreLOs(ArchiveHandle *AH)
void StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
Archive * OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
void InitDumpOptions(DumpOptions *opts)
static ArchiveHandle * _allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, bool dosync, ArchiveMode mode, SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
DumpOptions * dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
static void dump_lo_buf(ArchiveHandle *AH)
static TocEntry * pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate)
void WriteData(Archive *AHX, const void *data, size_t dLen)
int parallel_restore(ArchiveHandle *AH, TocEntry *te)
size_t WriteStr(ArchiveHandle *AH, const char *c)
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_1_15
void InitArchiveFmt_Null(ArchiveHandle *AH)
#define WORKER_CREATE_DONE
#define LOBBUFSIZE
void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
Definition: pg_backup_db.c:599
#define K_VERS_1_14
#define appendByteaLiteralAHX(buf, str, len, AH)
void struct _archiveOpts ArchiveOpts
void(* EndDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define K_VERS_SELF
#define K_VERS_1_10
void(* StartDataPtrType)(ArchiveHandle *AH, TocEntry *te)
#define ARCHIVE_MAJOR(version)
#define ARCHIVE_MINOR(version)
#define K_VERS_1_2
#define RESTORE_PASS_LAST
void InitArchiveFmt_Custom(ArchiveHandle *AH)
#define K_OFFSET_NO_DATA
void InitArchiveFmt_Tar(ArchiveHandle *AH)
#define REQ_SCHEMA
#define K_VERS_1_4
#define appendStringLiteralAHX(buf, str, AH)
void DropLOIfExists(ArchiveHandle *AH, Oid oid)
Definition: pg_backup_db.c:673
#define MAKE_ARCHIVE_VERSION(major, minor, rev)
#define K_VERS_1_5
void ReconnectToServer(ArchiveHandle *AH, const char *dbname)
Definition: pg_backup_db.c:74
#define ARCHIVE_REV(version)
#define WRITE_ERROR_EXIT
bool isValidTarHeader(char *header)
#define WORKER_IGNORED_ERRORS
#define REQ_SPECIAL
void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te, const char *cmdBegin, const char *cmdEnd)
Definition: pg_backup_db.c:552
#define K_VERS_1_6
@ STAGE_INITIALIZING
@ STAGE_PROCESSING
@ STAGE_NONE
@ STAGE_FINALIZING
@ ACT_RESTORE
@ ACT_DUMP
#define K_VERS_1_0
#define K_OFFSET_POS_NOT_SET
#define K_OFFSET_POS_SET
#define K_VERS_MAX
#define K_VERS_1_8
#define WORKER_OK
@ OUTPUT_COPYDATA
@ OUTPUT_SQLCMDS
@ OUTPUT_OTHERDATA
#define K_VERS_1_12
#define REQ_DATA
#define READ_ERROR_EXIT(fd)
void InitArchiveFmt_Directory(ArchiveHandle *AH)
#define K_VERS_1_11
#define K_VERS_1_9
#define WORKER_INHIBIT_DATA
#define K_VERS_1_16
@ RESTORE_PASS_POST_ACL
@ RESTORE_PASS_ACL
@ RESTORE_PASS_MAIN
#define K_VERS_1_3
#define K_VERS_1_7
void EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
Definition: pg_backup_db.c:500
int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
Definition: pg_backup_db.c:445
void * arg
#define DUMP_PRE_DATA
#define DUMP_DATA
#define DUMP_UNSECTIONED
#define pg_fatal(...)
#define DUMP_POST_DATA
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define MAXPGPATH
const void size_t len
const void * data
static int sig
Definition: pg_ctl.c:79
int32 encoding
Definition: pg_database.h:41
static bool dosync
Definition: pg_dump.c:103
static void setupDumpWorker(Archive *AH)
Definition: pg_dump.c:1351
#define exit_nicely(code)
Definition: pg_dumpall.c:124
static char * filename
Definition: pg_dumpall.c:119
bool pg_get_line_buf(FILE *stream, StringInfo buf)
Definition: pg_get_line.c:95
static char * user
Definition: pg_regress.c:120
static char * buf
Definition: pg_test_fsync.c:73
#define pg_encoding_to_char
Definition: pg_wchar.h:630
#define pg_char_to_encoding
Definition: pg_wchar.h:629
char * tablespace
Definition: pgbench.c:216
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)
Definition: port.h:449
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
char * c
size_t pvsnprintf(char *buf, size_t len, const char *fmt, va_list args)
Definition: psprintf.c:106
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
const char * simple_string_list_not_touched(SimpleStringList *list)
Definition: simple_list.c:144
static pg_noinline void Size size
Definition: slab.c:607
char * dbname
Definition: streamutil.c:52
void appendPsqlMetaConnect(PQExpBuffer buf, const char *dbname)
Definition: string_utils.c:590
const char * fmtId(const char *rawid)
Definition: string_utils.c:64
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
int minRemoteVersion
Definition: pg_backup.h:222
char * remoteVersionStr
Definition: pg_backup.h:218
DumpOptions * dopt
Definition: pg_backup.h:214
bool exit_on_error
Definition: pg_backup.h:237
char * searchpath
Definition: pg_backup.h:233
int maxRemoteVersion
Definition: pg_backup.h:223
int n_errors
Definition: pg_backup.h:238
bool std_strings
Definition: pg_backup.h:230
int numWorkers
Definition: pg_backup.h:225
int encoding
Definition: pg_backup.h:229
int verbose
Definition: pg_backup.h:217
RestoreOptions * ropt
Definition: pg_backup.h:215
Oid tableoid
Definition: pg_backup.h:266
bool(* write_func)(const void *ptr, size_t size, struct CompressFileHandle *CFH)
Definition: compress_io.h:139
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
TocEntry ** te
Definition: parallel.h:59
int numWorkers
Definition: parallel.h:57
SimpleStringListCell * head
Definition: simple_list.h:42
ArchiverStage stage
RestorePass restorePass
ArchiveFormat format
struct _tocEntry * toc
DeClonePtrType DeClonePtr
EndLOsPtrType EndLOsPtr
DataDirSyncMethod sync_method
struct _tocEntry * lastErrorTE
ReadExtraTocPtrType ReadExtraTocPtr
struct _tocEntry * currentTE
CustomOutPtrType CustomOutPtr
PGcancel *volatile connCancel
StartLOsPtrType StartLOsPtr
ArchiveEntryPtrType ArchiveEntryPtr
pg_compress_specification compression_spec
WriteDataPtrType WriteDataPtr
StartLOPtrType StartLOPtr
struct _tocEntry ** tocsByDumpId
ClonePtrType ClonePtr
WriteBufPtrType WriteBufPtr
PrepParallelRestorePtrType PrepParallelRestorePtr
EndLOPtrType EndLOPtr
WriteExtraTocPtrType WriteExtraTocPtr
ReadBytePtrType ReadBytePtr
PrintTocDataPtrType PrintTocDataPtr
struct _tocEntry * currToc
WriteBytePtrType WriteBytePtr
sqlparseInfo sqlparse
ReadBufPtrType ReadBufPtr
PrintExtraTocPtrType PrintExtraTocPtr
ArchiverStage lastErrorStage
StartDataPtrType StartDataPtr
ReopenPtrType ReopenPtr
ArchiverOutput outputKind
EndDataPtrType EndDataPtr
SetupWorkerPtrType SetupWorkerPtr
ClosePtrType ClosePtr
char * pgport
Definition: pg_backup.h:85
char * pghost
Definition: pg_backup.h:86
trivalue promptPassword
Definition: pg_backup.h:88
char * username
Definition: pg_backup.h:87
char * dbname
Definition: pg_backup.h:84
bool dataOnly
Definition: pg_backup.h:170
int dump_inserts
Definition: pg_backup.h:174
int column_inserts
Definition: pg_backup.h:178
int use_setsessauth
Definition: pg_backup.h:190
int outputCreateDB
Definition: pg_backup.h:198
bool include_everything
Definition: pg_backup.h:195
int sequence_data
Definition: pg_backup.h:204
int disable_dollar_quoting
Definition: pg_backup.h:177
int no_comments
Definition: pg_backup.h:180
int outputNoTableAm
Definition: pg_backup.h:188
int enable_row_security
Definition: pg_backup.h:191
char * outputSuperuser
Definition: pg_backup.h:202
int dumpSections
Definition: pg_backup.h:171
int no_security_labels
Definition: pg_backup.h:181
int no_publications
Definition: pg_backup.h:182
ConnParams cparams
Definition: pg_backup.h:164
const char * lockWaitTimeout
Definition: pg_backup.h:173
int no_subscriptions
Definition: pg_backup.h:183
bool aclsSkip
Definition: pg_backup.h:172
int outputClean
Definition: pg_backup.h:197
int outputNoTablespaces
Definition: pg_backup.h:189
int disable_triggers
Definition: pg_backup.h:187
int outputNoOwner
Definition: pg_backup.h:201
bool schemaOnly
Definition: pg_backup.h:169
SimpleStringList schemaExcludeNames
Definition: pg_backup.h:139
int include_everything
Definition: pg_backup.h:124
bool * idWanted
Definition: pg_backup.h:156
int suppressDumpWarnings
Definition: pg_backup.h:150
ConnParams cparams
Definition: pg_backup.h:144
SimpleStringList functionNames
Definition: pg_backup.h:137
char * use_role
Definition: pg_backup.h:105
SimpleStringList tableNames
Definition: pg_backup.h:141
SimpleStringList indexNames
Definition: pg_backup.h:136
pg_compress_specification compression_spec
Definition: pg_backup.h:148
int no_subscriptions
Definition: pg_backup.h:114
SimpleStringList triggerNames
Definition: pg_backup.h:140
int disable_dollar_quoting
Definition: pg_backup.h:107
SimpleStringList schemaNames
Definition: pg_backup.h:138
const char * filename
Definition: pg_backup.h:117
int no_security_labels
Definition: pg_backup.h:113
char * tocFile
Definition: pg_backup.h:127
char * superuser
Definition: pg_backup.h:104
const char * lockWaitTimeout
Definition: pg_backup.h:123
int enable_row_security
Definition: pg_backup.h:157
int disable_triggers
Definition: pg_backup.h:100
int noDataForFailedTables
Definition: pg_backup.h:146
struct _tocEntry * pending_next
struct _tocEntry * prev
teSection section
struct _tocEntry * pending_prev
DataDumperPtr dataDumper
pgoff_t dataLength
CatalogId catalogId
struct _tocEntry * next
const void * dataDumperArg
DumpId * revDeps
DumpId * dependencies
DumpId * lockDeps
pg_compress_algorithm algorithm
Definition: compression.h:34
int tm_sec
Definition: pgtime.h:36
PQExpBuffer curCmd
unsigned short st_mode
Definition: win32_port.h:268
static void * fn(void *arg)
Definition: thread-alloc.c:119
@ TRI_DEFAULT
Definition: vacuumlo.c:36
const char * type
#define stat
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:325
#define ftello(stream)
Definition: win32_port.h:219
#define S_ISREG(m)
Definition: win32_port.h:328
#define fseeko(stream, offset, origin)
Definition: win32_port.h:216
#define pgoff_t
Definition: win32_port.h:207
static void StartTransaction(void)
Definition: xact.c:2014
static void CommitTransaction(void)
Definition: xact.c:2178
ArchiveMode
Definition: xlog.h:62