PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "dumputils.h"
59 #include "fe_utils/option_utils.h"
60 #include "fe_utils/string_utils.h"
61 #include "getopt_long.h"
62 #include "libpq/libpq-fs.h"
63 #include "parallel.h"
64 #include "pg_backup_db.h"
65 #include "pg_backup_utils.h"
66 #include "pg_dump.h"
67 #include "storage/block.h"
68 
69 typedef struct
70 {
71  const char *descr; /* comment for an object */
72  Oid classoid; /* object class (catalog OID) */
73  Oid objoid; /* object OID */
74  int objsubid; /* subobject (table column #) */
75 } CommentItem;
76 
77 typedef struct
78 {
79  const char *provider; /* label provider of this security label */
80  const char *label; /* security label for an object */
81  Oid classoid; /* object class (catalog OID) */
82  Oid objoid; /* object OID */
83  int objsubid; /* subobject (table column #) */
84 } SecLabelItem;
85 
86 typedef enum OidOptions
87 {
91 } OidOptions;
92 
93 /* global decls */
94 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
95 
96 /* subquery used to convert user ID (eg, datdba) to user name */
97 static const char *username_subquery;
98 
99 /*
100  * For 8.0 and earlier servers, pulled from pg_database, for 8.1+ we use
101  * FirstNormalObjectId - 1.
102  */
103 static Oid g_last_builtin_oid; /* value of the last builtin oid */
104 
105 /* The specified names/patterns should to match at least one entity */
106 static int strict_names = 0;
107 
108 /*
109  * Object inclusion/exclusion lists
110  *
111  * The string lists record the patterns given by command-line switches,
112  * which we then convert to lists of OIDs of matching objects.
113  */
115 static SimpleOidList schema_include_oids = {NULL, NULL};
117 static SimpleOidList schema_exclude_oids = {NULL, NULL};
118 
120 static SimpleOidList table_include_oids = {NULL, NULL};
122 static SimpleOidList table_exclude_oids = {NULL, NULL};
124 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
127 
129 static SimpleOidList extension_include_oids = {NULL, NULL};
130 
131 static const CatalogId nilCatalogId = {0, 0};
132 
133 /* override for standard extra_float_digits setting */
134 static bool have_extra_float_digits = false;
136 
137 /*
138  * The default number of rows per INSERT when
139  * --inserts is specified without --rows-per-insert
140  */
141 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
142 
143 /*
144  * Macro for producing quoted, schema-qualified name of a dumpable object.
145  */
146 #define fmtQualifiedDumpable(obj) \
147  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
148  (obj)->dobj.name)
149 
150 static void help(const char *progname);
151 static void setup_connection(Archive *AH,
152  const char *dumpencoding, const char *dumpsnapshot,
153  char *use_role);
155 static void expand_schema_name_patterns(Archive *fout,
156  SimpleStringList *patterns,
157  SimpleOidList *oids,
158  bool strict_names);
159 static void expand_extension_name_patterns(Archive *fout,
160  SimpleStringList *patterns,
161  SimpleOidList *oids,
162  bool strict_names);
164  SimpleStringList *patterns,
165  SimpleOidList *oids);
166 static void expand_table_name_patterns(Archive *fout,
167  SimpleStringList *patterns,
168  SimpleOidList *oids,
169  bool strict_names);
170 static NamespaceInfo *findNamespace(Oid nsoid);
171 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
172 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
173 static void guessConstraintInheritance(TableInfo *tblinfo, int numTables);
174 static void dumpCommentExtended(Archive *fout, const char *type,
175  const char *name, const char *namespace,
176  const char *owner, CatalogId catalogId,
177  int subid, DumpId dumpId,
178  const char *initdb_comment);
179 static inline void dumpComment(Archive *fout, const char *type,
180  const char *name, const char *namespace,
181  const char *owner, CatalogId catalogId,
182  int subid, DumpId dumpId);
183 static int findComments(Archive *fout, Oid classoid, Oid objoid,
184  CommentItem **items);
185 static int collectComments(Archive *fout, CommentItem **items);
186 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
187  const char *namespace, const char *owner,
188  CatalogId catalogId, int subid, DumpId dumpId);
189 static int findSecLabels(Archive *fout, Oid classoid, Oid objoid,
190  SecLabelItem **items);
191 static int collectSecLabels(Archive *fout, SecLabelItem **items);
192 static void dumpDumpableObject(Archive *fout, const DumpableObject *dobj);
193 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
194 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
195 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
196 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
197 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
198 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
199 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
200 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
201 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
202 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo);
203 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
204 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
205 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
206 static void dumpCast(Archive *fout, const CastInfo *cast);
207 static void dumpTransform(Archive *fout, const TransformInfo *transform);
208 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
209 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *oprinfo);
210 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
211 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
212 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
213 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
214 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
215 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
216 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
217 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
218 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
219 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
220 static void dumpTableAttach(Archive *fout, const TableAttachInfo *tbinfo);
221 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
222 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
223 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
224 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
225 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
226 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
227 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
228 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
229 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
230 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
231 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
232 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
233 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
234 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
235 static void dumpUserMappings(Archive *fout,
236  const char *servername, const char *namespace,
237  const char *owner, CatalogId catalogId, DumpId dumpId);
238 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
239 
240 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
241  const char *type, const char *name, const char *subname,
242  const char *nspname, const char *owner,
243  const char *acls, const char *racls,
244  const char *initacls, const char *initracls);
245 
246 static void getDependencies(Archive *fout);
247 static void BuildArchiveDependencies(Archive *fout);
248 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
249  DumpId **dependencies, int *nDeps, int *allocDeps);
250 
252 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
253  DumpableObject *boundaryObjs);
254 
255 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
256 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
257 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
258 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
259 static void buildMatViewRefreshDependencies(Archive *fout);
260 static void getTableDataFKConstraints(void);
261 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
262  bool is_agg);
263 static char *format_function_arguments_old(Archive *fout,
264  const FuncInfo *finfo, int nallargs,
265  char **allargtypes,
266  char **argmodes,
267  char **argnames);
268 static char *format_function_signature(Archive *fout,
269  const FuncInfo *finfo, bool honor_quotes);
270 static char *convertRegProcReference(const char *proc);
271 static char *getFormattedOperatorName(const char *oproid);
272 static char *convertTSFunction(Archive *fout, Oid funcOid);
273 static Oid findLastBuiltinOid_V71(Archive *fout);
274 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
275 static void getBlobs(Archive *fout);
276 static void dumpBlob(Archive *fout, const BlobInfo *binfo);
277 static int dumpBlobs(Archive *fout, const void *arg);
278 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
279 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
280 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
281 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
282 static void dumpDatabase(Archive *AH);
283 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
284  const char *dbname, Oid dboid);
285 static void dumpEncoding(Archive *AH);
286 static void dumpStdStrings(Archive *AH);
287 static void dumpSearchPath(Archive *AH);
289  PQExpBuffer upgrade_buffer,
290  Oid pg_type_oid,
291  bool force_array_type,
292  bool include_multirange_type);
294  PQExpBuffer upgrade_buffer, Oid pg_rel_oid);
295 static void binary_upgrade_set_pg_class_oids(Archive *fout,
296  PQExpBuffer upgrade_buffer,
297  Oid pg_class_oid, bool is_index);
298 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
299  const DumpableObject *dobj,
300  const char *objtype,
301  const char *objname,
302  const char *objnamespace);
303 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
304 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
305 static bool nonemptyReloptions(const char *reloptions);
306 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
307  const char *prefix, Archive *fout);
308 static char *get_synchronized_snapshot(Archive *fout);
309 static void setupDumpWorker(Archive *AHX);
310 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
311 
312 
313 int
314 main(int argc, char **argv)
315 {
316  int c;
317  const char *filename = NULL;
318  const char *format = "p";
319  TableInfo *tblinfo;
320  int numTables;
321  DumpableObject **dobjs;
322  int numObjs;
323  DumpableObject *boundaryObjs;
324  int i;
325  int optindex;
326  RestoreOptions *ropt;
327  Archive *fout; /* the script file */
328  bool g_verbose = false;
329  const char *dumpencoding = NULL;
330  const char *dumpsnapshot = NULL;
331  char *use_role = NULL;
332  int numWorkers = 1;
333  int compressLevel = -1;
334  int plainText = 0;
335  ArchiveFormat archiveFormat = archUnknown;
336  ArchiveMode archiveMode;
337 
338  static DumpOptions dopt;
339 
340  static struct option long_options[] = {
341  {"data-only", no_argument, NULL, 'a'},
342  {"blobs", no_argument, NULL, 'b'},
343  {"no-blobs", no_argument, NULL, 'B'},
344  {"clean", no_argument, NULL, 'c'},
345  {"create", no_argument, NULL, 'C'},
346  {"dbname", required_argument, NULL, 'd'},
347  {"extension", required_argument, NULL, 'e'},
348  {"file", required_argument, NULL, 'f'},
349  {"format", required_argument, NULL, 'F'},
350  {"host", required_argument, NULL, 'h'},
351  {"jobs", 1, NULL, 'j'},
352  {"no-reconnect", no_argument, NULL, 'R'},
353  {"no-owner", no_argument, NULL, 'O'},
354  {"port", required_argument, NULL, 'p'},
355  {"schema", required_argument, NULL, 'n'},
356  {"exclude-schema", required_argument, NULL, 'N'},
357  {"schema-only", no_argument, NULL, 's'},
358  {"superuser", required_argument, NULL, 'S'},
359  {"table", required_argument, NULL, 't'},
360  {"exclude-table", required_argument, NULL, 'T'},
361  {"no-password", no_argument, NULL, 'w'},
362  {"password", no_argument, NULL, 'W'},
363  {"username", required_argument, NULL, 'U'},
364  {"verbose", no_argument, NULL, 'v'},
365  {"no-privileges", no_argument, NULL, 'x'},
366  {"no-acl", no_argument, NULL, 'x'},
367  {"compress", required_argument, NULL, 'Z'},
368  {"encoding", required_argument, NULL, 'E'},
369  {"help", no_argument, NULL, '?'},
370  {"version", no_argument, NULL, 'V'},
371 
372  /*
373  * the following options don't have an equivalent short option letter
374  */
375  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
376  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
377  {"column-inserts", no_argument, &dopt.column_inserts, 1},
378  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
379  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
380  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
381  {"exclude-table-data", required_argument, NULL, 4},
382  {"extra-float-digits", required_argument, NULL, 8},
383  {"if-exists", no_argument, &dopt.if_exists, 1},
384  {"inserts", no_argument, NULL, 9},
385  {"lock-wait-timeout", required_argument, NULL, 2},
386  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
387  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
388  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
389  {"role", required_argument, NULL, 3},
390  {"section", required_argument, NULL, 5},
391  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
392  {"snapshot", required_argument, NULL, 6},
393  {"strict-names", no_argument, &strict_names, 1},
394  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
395  {"no-comments", no_argument, &dopt.no_comments, 1},
396  {"no-publications", no_argument, &dopt.no_publications, 1},
397  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
398  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
399  {"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1},
400  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
401  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
402  {"no-sync", no_argument, NULL, 7},
403  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
404  {"rows-per-insert", required_argument, NULL, 10},
405  {"include-foreign-data", required_argument, NULL, 11},
406 
407  {NULL, 0, NULL, 0}
408  };
409 
410  pg_logging_init(argv[0]);
412  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
413 
414  /*
415  * Initialize what we need for parallel execution, especially for thread
416  * support on Windows.
417  */
419 
420  progname = get_progname(argv[0]);
421 
422  if (argc > 1)
423  {
424  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
425  {
426  help(progname);
427  exit_nicely(0);
428  }
429  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
430  {
431  puts("pg_dump (PostgreSQL) " PG_VERSION);
432  exit_nicely(0);
433  }
434  }
435 
436  InitDumpOptions(&dopt);
437 
438  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
439  long_options, &optindex)) != -1)
440  {
441  switch (c)
442  {
443  case 'a': /* Dump data only */
444  dopt.dataOnly = true;
445  break;
446 
447  case 'b': /* Dump blobs */
448  dopt.outputBlobs = true;
449  break;
450 
451  case 'B': /* Don't dump blobs */
452  dopt.dontOutputBlobs = true;
453  break;
454 
455  case 'c': /* clean (i.e., drop) schema prior to create */
456  dopt.outputClean = 1;
457  break;
458 
459  case 'C': /* Create DB */
460  dopt.outputCreateDB = 1;
461  break;
462 
463  case 'd': /* database name */
464  dopt.cparams.dbname = pg_strdup(optarg);
465  break;
466 
467  case 'e': /* include extension(s) */
468  simple_string_list_append(&extension_include_patterns, optarg);
469  dopt.include_everything = false;
470  break;
471 
472  case 'E': /* Dump encoding */
473  dumpencoding = pg_strdup(optarg);
474  break;
475 
476  case 'f':
477  filename = pg_strdup(optarg);
478  break;
479 
480  case 'F':
481  format = pg_strdup(optarg);
482  break;
483 
484  case 'h': /* server host */
485  dopt.cparams.pghost = pg_strdup(optarg);
486  break;
487 
488  case 'j': /* number of dump jobs */
489  if (!option_parse_int(optarg, "-j/--jobs", 1,
490  PG_MAX_JOBS,
491  &numWorkers))
492  exit_nicely(1);
493  break;
494 
495  case 'n': /* include schema(s) */
496  simple_string_list_append(&schema_include_patterns, optarg);
497  dopt.include_everything = false;
498  break;
499 
500  case 'N': /* exclude schema(s) */
501  simple_string_list_append(&schema_exclude_patterns, optarg);
502  break;
503 
504  case 'O': /* Don't reconnect to match owner */
505  dopt.outputNoOwner = 1;
506  break;
507 
508  case 'p': /* server port */
509  dopt.cparams.pgport = pg_strdup(optarg);
510  break;
511 
512  case 'R':
513  /* no-op, still accepted for backwards compatibility */
514  break;
515 
516  case 's': /* dump schema only */
517  dopt.schemaOnly = true;
518  break;
519 
520  case 'S': /* Username for superuser in plain text output */
522  break;
523 
524  case 't': /* include table(s) */
525  simple_string_list_append(&table_include_patterns, optarg);
526  dopt.include_everything = false;
527  break;
528 
529  case 'T': /* exclude table(s) */
530  simple_string_list_append(&table_exclude_patterns, optarg);
531  break;
532 
533  case 'U':
535  break;
536 
537  case 'v': /* verbose */
538  g_verbose = true;
540  break;
541 
542  case 'w':
544  break;
545 
546  case 'W':
548  break;
549 
550  case 'x': /* skip ACL dump */
551  dopt.aclsSkip = true;
552  break;
553 
554  case 'Z': /* Compression Level */
555  if (!option_parse_int(optarg, "-Z/--compress", 0, 9,
556  &compressLevel))
557  exit_nicely(1);
558  break;
559 
560  case 0:
561  /* This covers the long options. */
562  break;
563 
564  case 2: /* lock-wait-timeout */
566  break;
567 
568  case 3: /* SET ROLE */
569  use_role = pg_strdup(optarg);
570  break;
571 
572  case 4: /* exclude table(s) data */
573  simple_string_list_append(&tabledata_exclude_patterns, optarg);
574  break;
575 
576  case 5: /* section */
578  break;
579 
580  case 6: /* snapshot */
581  dumpsnapshot = pg_strdup(optarg);
582  break;
583 
584  case 7: /* no-sync */
585  dosync = false;
586  break;
587 
588  case 8:
590  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
592  exit_nicely(1);
593  break;
594 
595  case 9: /* inserts */
596 
597  /*
598  * dump_inserts also stores --rows-per-insert, careful not to
599  * overwrite that.
600  */
601  if (dopt.dump_inserts == 0)
603  break;
604 
605  case 10: /* rows per insert */
606  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
607  &dopt.dump_inserts))
608  exit_nicely(1);
609  break;
610 
611  case 11: /* include foreign data */
612  simple_string_list_append(&foreign_servers_include_patterns,
613  optarg);
614  break;
615 
616  default:
617  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
618  exit_nicely(1);
619  }
620  }
621 
622  /*
623  * Non-option argument specifies database name as long as it wasn't
624  * already specified with -d / --dbname
625  */
626  if (optind < argc && dopt.cparams.dbname == NULL)
627  dopt.cparams.dbname = argv[optind++];
628 
629  /* Complain if any arguments remain */
630  if (optind < argc)
631  {
632  pg_log_error("too many command-line arguments (first is \"%s\")",
633  argv[optind]);
634  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
635  progname);
636  exit_nicely(1);
637  }
638 
639  /* --column-inserts implies --inserts */
640  if (dopt.column_inserts && dopt.dump_inserts == 0)
642 
643  /*
644  * Binary upgrade mode implies dumping sequence data even in schema-only
645  * mode. This is not exposed as a separate option, but kept separate
646  * internally for clarity.
647  */
648  if (dopt.binary_upgrade)
649  dopt.sequence_data = 1;
650 
651  if (dopt.dataOnly && dopt.schemaOnly)
652  {
653  pg_log_error("options -s/--schema-only and -a/--data-only cannot be used together");
654  exit_nicely(1);
655  }
656 
657  if (dopt.schemaOnly && foreign_servers_include_patterns.head != NULL)
658  fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
659 
660  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
661  fatal("option --include-foreign-data is not supported with parallel backup");
662 
663  if (dopt.dataOnly && dopt.outputClean)
664  {
665  pg_log_error("options -c/--clean and -a/--data-only cannot be used together");
666  exit_nicely(1);
667  }
668 
669  if (dopt.if_exists && !dopt.outputClean)
670  fatal("option --if-exists requires option -c/--clean");
671 
672  /*
673  * --inserts are already implied above if --column-inserts or
674  * --rows-per-insert were specified.
675  */
676  if (dopt.do_nothing && dopt.dump_inserts == 0)
677  fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
678 
679  /* Identify archive format to emit */
680  archiveFormat = parseArchiveFormat(format, &archiveMode);
681 
682  /* archiveFormat specific setup */
683  if (archiveFormat == archNull)
684  plainText = 1;
685 
686  /* Custom and directory formats are compressed by default, others not */
687  if (compressLevel == -1)
688  {
689 #ifdef HAVE_LIBZ
690  if (archiveFormat == archCustom || archiveFormat == archDirectory)
691  compressLevel = Z_DEFAULT_COMPRESSION;
692  else
693 #endif
694  compressLevel = 0;
695  }
696 
697 #ifndef HAVE_LIBZ
698  if (compressLevel != 0)
699  pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
700  compressLevel = 0;
701 #endif
702 
703  /*
704  * If emitting an archive format, we always want to emit a DATABASE item,
705  * in case --create is specified at pg_restore time.
706  */
707  if (!plainText)
708  dopt.outputCreateDB = 1;
709 
710  /* Parallel backup only in the directory archive format so far */
711  if (archiveFormat != archDirectory && numWorkers > 1)
712  fatal("parallel backup only supported by the directory format");
713 
714  /* Open the output file */
715  fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
716  archiveMode, setupDumpWorker);
717 
718  /* Make dump options accessible right away */
719  SetArchiveOptions(fout, &dopt, NULL);
720 
721  /* Register the cleanup hook */
722  on_exit_close_archive(fout);
723 
724  /* Let the archiver know how noisy to be */
725  fout->verbose = g_verbose;
726 
727 
728  /*
729  * We allow the server to be back to 8.0, and up to any minor release of
730  * our own major version. (See also version check in pg_dumpall.c.)
731  */
732  fout->minRemoteVersion = 80000;
733  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
734 
735  fout->numWorkers = numWorkers;
736 
737  /*
738  * Open the database using the Archiver, so it knows about it. Errors mean
739  * death.
740  */
741  ConnectDatabase(fout, &dopt.cparams, false);
742  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
743 
744  /*
745  * Disable security label support if server version < v9.1.x (prevents
746  * access to nonexistent pg_seclabel catalog)
747  */
748  if (fout->remoteVersion < 90100)
749  dopt.no_security_labels = 1;
750 
751  /*
752  * On hot standbys, never try to dump unlogged table data, since it will
753  * just throw an error.
754  */
755  if (fout->isStandby)
756  dopt.no_unlogged_table_data = true;
757 
758  /* Select the appropriate subquery to convert user IDs to names */
759  if (fout->remoteVersion >= 80100)
760  username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
761  else
762  username_subquery = "SELECT usename FROM pg_catalog.pg_user WHERE usesysid =";
763 
764  /* check the version for the synchronized snapshots feature */
765  if (numWorkers > 1 && fout->remoteVersion < 90200
766  && !dopt.no_synchronized_snapshots)
767  fatal("Synchronized snapshots are not supported by this server version.\n"
768  "Run with --no-synchronized-snapshots instead if you do not need\n"
769  "synchronized snapshots.");
770 
771  /* check the version when a snapshot is explicitly specified by user */
772  if (dumpsnapshot && fout->remoteVersion < 90200)
773  fatal("Exported snapshots are not supported by this server version.");
774 
775  /*
776  * Find the last built-in OID, if needed (prior to 8.1)
777  *
778  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
779  */
780  if (fout->remoteVersion < 80100)
782  else
784 
785  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
786 
787  /* Expand schema selection patterns into OID lists */
788  if (schema_include_patterns.head != NULL)
789  {
790  expand_schema_name_patterns(fout, &schema_include_patterns,
791  &schema_include_oids,
792  strict_names);
793  if (schema_include_oids.head == NULL)
794  fatal("no matching schemas were found");
795  }
796  expand_schema_name_patterns(fout, &schema_exclude_patterns,
797  &schema_exclude_oids,
798  false);
799  /* non-matching exclusion patterns aren't an error */
800 
801  /* Expand table selection patterns into OID lists */
802  if (table_include_patterns.head != NULL)
803  {
804  expand_table_name_patterns(fout, &table_include_patterns,
805  &table_include_oids,
806  strict_names);
807  if (table_include_oids.head == NULL)
808  fatal("no matching tables were found");
809  }
810  expand_table_name_patterns(fout, &table_exclude_patterns,
811  &table_exclude_oids,
812  false);
813 
814  expand_table_name_patterns(fout, &tabledata_exclude_patterns,
815  &tabledata_exclude_oids,
816  false);
817 
818  expand_foreign_server_name_patterns(fout, &foreign_servers_include_patterns,
819  &foreign_servers_include_oids);
820 
821  /* non-matching exclusion patterns aren't an error */
822 
823  /* Expand extension selection patterns into OID lists */
824  if (extension_include_patterns.head != NULL)
825  {
826  expand_extension_name_patterns(fout, &extension_include_patterns,
827  &extension_include_oids,
828  strict_names);
829  if (extension_include_oids.head == NULL)
830  fatal("no matching extensions were found");
831  }
832 
833  /*
834  * Dumping blobs is the default for dumps where an inclusion switch is not
835  * used (an "include everything" dump). -B can be used to exclude blobs
836  * from those dumps. -b can be used to include blobs even when an
837  * inclusion switch is used.
838  *
839  * -s means "schema only" and blobs are data, not schema, so we never
840  * include blobs when -s is used.
841  */
842  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputBlobs)
843  dopt.outputBlobs = true;
844 
845  /*
846  * Now scan the database and create DumpableObject structs for all the
847  * objects we intend to dump.
848  */
849  tblinfo = getSchemaData(fout, &numTables);
850 
851  if (fout->remoteVersion < 80400)
852  guessConstraintInheritance(tblinfo, numTables);
853 
854  if (!dopt.schemaOnly)
855  {
856  getTableData(&dopt, tblinfo, numTables, 0);
858  if (dopt.dataOnly)
860  }
861 
862  if (dopt.schemaOnly && dopt.sequence_data)
863  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
864 
865  /*
866  * In binary-upgrade mode, we do not have to worry about the actual blob
867  * data or the associated metadata that resides in the pg_largeobject and
868  * pg_largeobject_metadata tables, respectively.
869  *
870  * However, we do need to collect blob information as there may be
871  * comments or other information on blobs that we do need to dump out.
872  */
873  if (dopt.outputBlobs || dopt.binary_upgrade)
874  getBlobs(fout);
875 
876  /*
877  * Collect dependency data to assist in ordering the objects.
878  */
879  getDependencies(fout);
880 
881  /* Lastly, create dummy objects to represent the section boundaries */
882  boundaryObjs = createBoundaryObjects();
883 
884  /* Get pointers to all the known DumpableObjects */
885  getDumpableObjects(&dobjs, &numObjs);
886 
887  /*
888  * Add dummy dependencies to enforce the dump section ordering.
889  */
890  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
891 
892  /*
893  * Sort the objects into a safe dump order (no forward references).
894  *
895  * We rely on dependency information to help us determine a safe order, so
896  * the initial sort is mostly for cosmetic purposes: we sort by name to
897  * ensure that logically identical schemas will dump identically.
898  */
899  sortDumpableObjectsByTypeName(dobjs, numObjs);
900 
901  sortDumpableObjects(dobjs, numObjs,
902  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
903 
904  /*
905  * Create archive TOC entries for all the objects to be dumped, in a safe
906  * order.
907  */
908 
909  /*
910  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
911  */
912  dumpEncoding(fout);
913  dumpStdStrings(fout);
914  dumpSearchPath(fout);
915 
916  /* The database items are always next, unless we don't want them at all */
917  if (dopt.outputCreateDB)
918  dumpDatabase(fout);
919 
920  /* Now the rearrangeable objects. */
921  for (i = 0; i < numObjs; i++)
922  dumpDumpableObject(fout, dobjs[i]);
923 
924  /*
925  * Set up options info to ensure we dump what we want.
926  */
927  ropt = NewRestoreOptions();
928  ropt->filename = filename;
929 
930  /* if you change this list, see dumpOptionsFromRestoreOptions */
931  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
932  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
933  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
934  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
936  ropt->dropSchema = dopt.outputClean;
937  ropt->dataOnly = dopt.dataOnly;
938  ropt->schemaOnly = dopt.schemaOnly;
939  ropt->if_exists = dopt.if_exists;
940  ropt->column_inserts = dopt.column_inserts;
941  ropt->dumpSections = dopt.dumpSections;
942  ropt->aclsSkip = dopt.aclsSkip;
943  ropt->superuser = dopt.outputSuperuser;
944  ropt->createDB = dopt.outputCreateDB;
945  ropt->noOwner = dopt.outputNoOwner;
946  ropt->noTablespace = dopt.outputNoTablespaces;
947  ropt->disable_triggers = dopt.disable_triggers;
948  ropt->use_setsessauth = dopt.use_setsessauth;
950  ropt->dump_inserts = dopt.dump_inserts;
951  ropt->no_comments = dopt.no_comments;
952  ropt->no_publications = dopt.no_publications;
954  ropt->no_subscriptions = dopt.no_subscriptions;
955  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
958  ropt->sequence_data = dopt.sequence_data;
959  ropt->binary_upgrade = dopt.binary_upgrade;
960 
961  if (compressLevel == -1)
962  ropt->compression = 0;
963  else
964  ropt->compression = compressLevel;
965 
966  ropt->suppressDumpWarnings = true; /* We've already shown them */
967 
968  SetArchiveOptions(fout, &dopt, ropt);
969 
970  /* Mark which entries should be output */
972 
973  /*
974  * The archive's TOC entries are now marked as to which ones will actually
975  * be output, so we can set up their dependency lists properly. This isn't
976  * necessary for plain-text output, though.
977  */
978  if (!plainText)
980 
981  /*
982  * And finally we can do the actual output.
983  *
984  * Note: for non-plain-text output formats, the output file is written
985  * inside CloseArchive(). This is, um, bizarre; but not worth changing
986  * right now.
987  */
988  if (plainText)
989  RestoreArchive(fout);
990 
991  CloseArchive(fout);
992 
993  exit_nicely(0);
994 }
995 
996 
997 static void
998 help(const char *progname)
999 {
1000  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1001  printf(_("Usage:\n"));
1002  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1003 
1004  printf(_("\nGeneral options:\n"));
1005  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1006  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1007  " plain text (default))\n"));
1008  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1009  printf(_(" -v, --verbose verbose mode\n"));
1010  printf(_(" -V, --version output version information, then exit\n"));
1011  printf(_(" -Z, --compress=0-9 compression level for compressed formats\n"));
1012  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1013  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1014  printf(_(" -?, --help show this help, then exit\n"));
1015 
1016  printf(_("\nOptions controlling the output content:\n"));
1017  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1018  printf(_(" -b, --blobs include large objects in dump\n"));
1019  printf(_(" -B, --no-blobs exclude large objects in dump\n"));
1020  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1021  printf(_(" -C, --create include commands to create database in dump\n"));
1022  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1023  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1024  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1025  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1026  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1027  " plain-text format\n"));
1028  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1029  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1030  printf(_(" -t, --table=PATTERN dump the specified table(s) only\n"));
1031  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1032  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1033  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1034  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1035  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1036  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1037  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1038  " access to)\n"));
1039  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1040  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1041  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1042  printf(_(" --include-foreign-data=PATTERN\n"
1043  " include data of foreign tables on foreign\n"
1044  " servers matching PATTERN\n"));
1045  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1046  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1047  printf(_(" --no-comments do not dump comments\n"));
1048  printf(_(" --no-publications do not dump publications\n"));
1049  printf(_(" --no-security-labels do not dump security label assignments\n"));
1050  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1051  printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n"));
1052  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1053  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1054  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1055  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1056  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1057  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1058  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1059  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1060  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1061  printf(_(" --strict-names require table and/or schema include patterns to\n"
1062  " match at least one entity each\n"));
1063  printf(_(" --use-set-session-authorization\n"
1064  " use SET SESSION AUTHORIZATION commands instead of\n"
1065  " ALTER OWNER commands to set ownership\n"));
1066 
1067  printf(_("\nConnection options:\n"));
1068  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1069  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1070  printf(_(" -p, --port=PORT database server port number\n"));
1071  printf(_(" -U, --username=NAME connect as specified database user\n"));
1072  printf(_(" -w, --no-password never prompt for password\n"));
1073  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1074  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1075 
1076  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1077  "variable value is used.\n\n"));
1078  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1079  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1080 }
1081 
1082 static void
1083 setup_connection(Archive *AH, const char *dumpencoding,
1084  const char *dumpsnapshot, char *use_role)
1085 {
1086  DumpOptions *dopt = AH->dopt;
1087  PGconn *conn = GetConnection(AH);
1088  const char *std_strings;
1089 
1091 
1092  /*
1093  * Set the client encoding if requested.
1094  */
1095  if (dumpencoding)
1096  {
1097  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1098  fatal("invalid client encoding \"%s\" specified",
1099  dumpencoding);
1100  }
1101 
1102  /*
1103  * Get the active encoding and the standard_conforming_strings setting, so
1104  * we know how to escape strings.
1105  */
1106  AH->encoding = PQclientEncoding(conn);
1107 
1108  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1109  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1110 
1111  /*
1112  * Set the role if requested. In a parallel dump worker, we'll be passed
1113  * use_role == NULL, but AH->use_role is already set (if user specified it
1114  * originally) and we should use that.
1115  */
1116  if (!use_role && AH->use_role)
1117  use_role = AH->use_role;
1118 
1119  /* Set the role if requested */
1120  if (use_role && AH->remoteVersion >= 80100)
1121  {
1122  PQExpBuffer query = createPQExpBuffer();
1123 
1124  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1125  ExecuteSqlStatement(AH, query->data);
1126  destroyPQExpBuffer(query);
1127 
1128  /* save it for possible later use by parallel workers */
1129  if (!AH->use_role)
1130  AH->use_role = pg_strdup(use_role);
1131  }
1132 
1133  /* Set the datestyle to ISO to ensure the dump's portability */
1134  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1135 
1136  /* Likewise, avoid using sql_standard intervalstyle */
1137  if (AH->remoteVersion >= 80400)
1138  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1139 
1140  /*
1141  * Use an explicitly specified extra_float_digits if it has been provided.
1142  * Otherwise, set extra_float_digits so that we can dump float data
1143  * exactly (given correctly implemented float I/O code, anyway).
1144  */
1146  {
1148 
1149  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1151  ExecuteSqlStatement(AH, q->data);
1152  destroyPQExpBuffer(q);
1153  }
1154  else if (AH->remoteVersion >= 90000)
1155  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1156  else
1157  ExecuteSqlStatement(AH, "SET extra_float_digits TO 2");
1158 
1159  /*
1160  * If synchronized scanning is supported, disable it, to prevent
1161  * unpredictable changes in row ordering across a dump and reload.
1162  */
1163  if (AH->remoteVersion >= 80300)
1164  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1165 
1166  /*
1167  * Disable timeouts if supported.
1168  */
1169  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1170  if (AH->remoteVersion >= 90300)
1171  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1172  if (AH->remoteVersion >= 90600)
1173  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1174 
1175  /*
1176  * Quote all identifiers, if requested.
1177  */
1178  if (quote_all_identifiers && AH->remoteVersion >= 90100)
1179  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1180 
1181  /*
1182  * Adjust row-security mode, if supported.
1183  */
1184  if (AH->remoteVersion >= 90500)
1185  {
1186  if (dopt->enable_row_security)
1187  ExecuteSqlStatement(AH, "SET row_security = on");
1188  else
1189  ExecuteSqlStatement(AH, "SET row_security = off");
1190  }
1191 
1192  /*
1193  * Start transaction-snapshot mode transaction to dump consistent data.
1194  */
1195  ExecuteSqlStatement(AH, "BEGIN");
1196  if (AH->remoteVersion >= 90100)
1197  {
1198  /*
1199  * To support the combination of serializable_deferrable with the jobs
1200  * option we use REPEATABLE READ for the worker connections that are
1201  * passed a snapshot. As long as the snapshot is acquired in a
1202  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1203  * REPEATABLE READ transaction provides the appropriate integrity
1204  * guarantees. This is a kluge, but safe for back-patching.
1205  */
1206  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1208  "SET TRANSACTION ISOLATION LEVEL "
1209  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1210  else
1212  "SET TRANSACTION ISOLATION LEVEL "
1213  "REPEATABLE READ, READ ONLY");
1214  }
1215  else
1216  {
1218  "SET TRANSACTION ISOLATION LEVEL "
1219  "SERIALIZABLE, READ ONLY");
1220  }
1221 
1222  /*
1223  * If user specified a snapshot to use, select that. In a parallel dump
1224  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1225  * is already set (if the server can handle it) and we should use that.
1226  */
1227  if (dumpsnapshot)
1228  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1229 
1230  if (AH->sync_snapshot_id)
1231  {
1232  PQExpBuffer query = createPQExpBuffer();
1233 
1234  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1235  appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
1236  ExecuteSqlStatement(AH, query->data);
1237  destroyPQExpBuffer(query);
1238  }
1239  else if (AH->numWorkers > 1 &&
1240  AH->remoteVersion >= 90200 &&
1242  {
1243  if (AH->isStandby && AH->remoteVersion < 100000)
1244  fatal("Synchronized snapshots on standby servers are not supported by this server version.\n"
1245  "Run with --no-synchronized-snapshots instead if you do not need\n"
1246  "synchronized snapshots.");
1247 
1248 
1250  }
1251 }
1252 
1253 /* Set up connection for a parallel worker process */
1254 static void
1256 {
1257  /*
1258  * We want to re-select all the same values the leader connection is
1259  * using. We'll have inherited directly-usable values in
1260  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1261  * inherited encoding value back to a string to pass to setup_connection.
1262  */
1263  setup_connection(AH,
1265  NULL,
1266  NULL);
1267 }
1268 
1269 static char *
1271 {
1272  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1273  char *result;
1274  PGresult *res;
1275 
1276  res = ExecuteSqlQueryForSingleRow(fout, query);
1277  result = pg_strdup(PQgetvalue(res, 0, 0));
1278  PQclear(res);
1279 
1280  return result;
1281 }
1282 
1283 static ArchiveFormat
1285 {
1286  ArchiveFormat archiveFormat;
1287 
1288  *mode = archModeWrite;
1289 
1290  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1291  {
1292  /* This is used by pg_dumpall, and is not documented */
1293  archiveFormat = archNull;
1294  *mode = archModeAppend;
1295  }
1296  else if (pg_strcasecmp(format, "c") == 0)
1297  archiveFormat = archCustom;
1298  else if (pg_strcasecmp(format, "custom") == 0)
1299  archiveFormat = archCustom;
1300  else if (pg_strcasecmp(format, "d") == 0)
1301  archiveFormat = archDirectory;
1302  else if (pg_strcasecmp(format, "directory") == 0)
1303  archiveFormat = archDirectory;
1304  else if (pg_strcasecmp(format, "p") == 0)
1305  archiveFormat = archNull;
1306  else if (pg_strcasecmp(format, "plain") == 0)
1307  archiveFormat = archNull;
1308  else if (pg_strcasecmp(format, "t") == 0)
1309  archiveFormat = archTar;
1310  else if (pg_strcasecmp(format, "tar") == 0)
1311  archiveFormat = archTar;
1312  else
1313  fatal("invalid output format \"%s\" specified", format);
1314  return archiveFormat;
1315 }
1316 
1317 /*
1318  * Find the OIDs of all schemas matching the given list of patterns,
1319  * and append them to the given OID list.
1320  */
1321 static void
1323  SimpleStringList *patterns,
1324  SimpleOidList *oids,
1325  bool strict_names)
1326 {
1327  PQExpBuffer query;
1328  PGresult *res;
1329  SimpleStringListCell *cell;
1330  int i;
1331 
1332  if (patterns->head == NULL)
1333  return; /* nothing to do */
1334 
1335  query = createPQExpBuffer();
1336 
1337  /*
1338  * The loop below runs multiple SELECTs might sometimes result in
1339  * duplicate entries in the OID list, but we don't care.
1340  */
1341 
1342  for (cell = patterns->head; cell; cell = cell->next)
1343  {
1344  appendPQExpBufferStr(query,
1345  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1346  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1347  false, NULL, "n.nspname", NULL, NULL);
1348 
1349  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1350  if (strict_names && PQntuples(res) == 0)
1351  fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1352 
1353  for (i = 0; i < PQntuples(res); i++)
1354  {
1355  simple_oid_list_append(oids, atooid(PQgetvalue(res, i, 0)));
1356  }
1357 
1358  PQclear(res);
1359  resetPQExpBuffer(query);
1360  }
1361 
1362  destroyPQExpBuffer(query);
1363 }
1364 
1365 /*
1366  * Find the OIDs of all extensions matching the given list of patterns,
1367  * and append them to the given OID list.
1368  */
1369 static void
1371  SimpleStringList *patterns,
1372  SimpleOidList *oids,
1373  bool strict_names)
1374 {
1375  PQExpBuffer query;
1376  PGresult *res;
1377  SimpleStringListCell *cell;
1378  int i;
1379 
1380  if (patterns->head == NULL)
1381  return; /* nothing to do */
1382 
1383  query = createPQExpBuffer();
1384 
1385  /*
1386  * The loop below runs multiple SELECTs might sometimes result in
1387  * duplicate entries in the OID list, but we don't care.
1388  */
1389  for (cell = patterns->head; cell; cell = cell->next)
1390  {
1391  appendPQExpBufferStr(query,
1392  "SELECT oid FROM pg_catalog.pg_extension e\n");
1393  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1394  false, NULL, "e.extname", NULL, NULL);
1395 
1396  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1397  if (strict_names && PQntuples(res) == 0)
1398  fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1399 
1400  for (i = 0; i < PQntuples(res); i++)
1401  {
1402  simple_oid_list_append(oids, atooid(PQgetvalue(res, i, 0)));
1403  }
1404 
1405  PQclear(res);
1406  resetPQExpBuffer(query);
1407  }
1408 
1409  destroyPQExpBuffer(query);
1410 }
1411 
1412 /*
1413  * Find the OIDs of all foreign servers matching the given list of patterns,
1414  * and append them to the given OID list.
1415  */
1416 static void
1418  SimpleStringList *patterns,
1419  SimpleOidList *oids)
1420 {
1421  PQExpBuffer query;
1422  PGresult *res;
1423  SimpleStringListCell *cell;
1424  int i;
1425 
1426  if (patterns->head == NULL)
1427  return; /* nothing to do */
1428 
1429  query = createPQExpBuffer();
1430 
1431  /*
1432  * The loop below runs multiple SELECTs might sometimes result in
1433  * duplicate entries in the OID list, but we don't care.
1434  */
1435 
1436  for (cell = patterns->head; cell; cell = cell->next)
1437  {
1438  appendPQExpBufferStr(query,
1439  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1440  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1441  false, NULL, "s.srvname", NULL, NULL);
1442 
1443  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1444  if (PQntuples(res) == 0)
1445  fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1446 
1447  for (i = 0; i < PQntuples(res); i++)
1448  simple_oid_list_append(oids, atooid(PQgetvalue(res, i, 0)));
1449 
1450  PQclear(res);
1451  resetPQExpBuffer(query);
1452  }
1453 
1454  destroyPQExpBuffer(query);
1455 }
1456 
1457 /*
1458  * Find the OIDs of all tables matching the given list of patterns,
1459  * and append them to the given OID list. See also expand_dbname_patterns()
1460  * in pg_dumpall.c
1461  */
1462 static void
1464  SimpleStringList *patterns, SimpleOidList *oids,
1465  bool strict_names)
1466 {
1467  PQExpBuffer query;
1468  PGresult *res;
1469  SimpleStringListCell *cell;
1470  int i;
1471 
1472  if (patterns->head == NULL)
1473  return; /* nothing to do */
1474 
1475  query = createPQExpBuffer();
1476 
1477  /*
1478  * this might sometimes result in duplicate entries in the OID list, but
1479  * we don't care.
1480  */
1481 
1482  for (cell = patterns->head; cell; cell = cell->next)
1483  {
1484  /*
1485  * Query must remain ABSOLUTELY devoid of unqualified names. This
1486  * would be unnecessary given a pg_table_is_visible() variant taking a
1487  * search_path argument.
1488  */
1489  appendPQExpBuffer(query,
1490  "SELECT c.oid"
1491  "\nFROM pg_catalog.pg_class c"
1492  "\n LEFT JOIN pg_catalog.pg_namespace n"
1493  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1494  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1495  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1496  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1497  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1498  RELKIND_PARTITIONED_TABLE);
1499  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1500  false, "n.nspname", "c.relname", NULL,
1501  "pg_catalog.pg_table_is_visible(c.oid)");
1502 
1503  ExecuteSqlStatement(fout, "RESET search_path");
1504  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1507  if (strict_names && PQntuples(res) == 0)
1508  fatal("no matching tables were found for pattern \"%s\"", cell->val);
1509 
1510  for (i = 0; i < PQntuples(res); i++)
1511  {
1512  simple_oid_list_append(oids, atooid(PQgetvalue(res, i, 0)));
1513  }
1514 
1515  PQclear(res);
1516  resetPQExpBuffer(query);
1517  }
1518 
1519  destroyPQExpBuffer(query);
1520 }
1521 
1522 /*
1523  * checkExtensionMembership
1524  * Determine whether object is an extension member, and if so,
1525  * record an appropriate dependency and set the object's dump flag.
1526  *
1527  * It's important to call this for each object that could be an extension
1528  * member. Generally, we integrate this with determining the object's
1529  * to-be-dumped-ness, since extension membership overrides other rules for that.
1530  *
1531  * Returns true if object is an extension member, else false.
1532  */
1533 static bool
1535 {
1536  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1537 
1538  if (ext == NULL)
1539  return false;
1540 
1541  dobj->ext_member = true;
1542 
1543  /* Record dependency so that getDependencies needn't deal with that */
1544  addObjectDependency(dobj, ext->dobj.dumpId);
1545 
1546  /*
1547  * In 9.6 and above, mark the member object to have any non-initial ACL,
1548  * policies, and security labels dumped.
1549  *
1550  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1551  * extract the information about the object. We don't provide support for
1552  * initial policies and security labels and it seems unlikely for those to
1553  * ever exist, but we may have to revisit this later.
1554  *
1555  * Prior to 9.6, we do not include any extension member components.
1556  *
1557  * In binary upgrades, we still dump all components of the members
1558  * individually, since the idea is to exactly reproduce the database
1559  * contents rather than replace the extension contents with something
1560  * different.
1561  */
1562  if (fout->dopt->binary_upgrade)
1563  dobj->dump = ext->dobj.dump;
1564  else
1565  {
1566  if (fout->remoteVersion < 90600)
1567  dobj->dump = DUMP_COMPONENT_NONE;
1568  else
1569  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1572  }
1573 
1574  return true;
1575 }
1576 
1577 /*
1578  * selectDumpableNamespace: policy-setting subroutine
1579  * Mark a namespace as to be dumped or not
1580  */
1581 static void
1583 {
1584  /*
1585  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1586  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1587  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1588  */
1589  nsinfo->create = true;
1590 
1591  /*
1592  * If specific tables are being dumped, do not dump any complete
1593  * namespaces. If specific namespaces are being dumped, dump just those
1594  * namespaces. Otherwise, dump all non-system namespaces.
1595  */
1596  if (table_include_oids.head != NULL)
1597  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1598  else if (schema_include_oids.head != NULL)
1599  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1600  simple_oid_list_member(&schema_include_oids,
1601  nsinfo->dobj.catId.oid) ?
1603  else if (fout->remoteVersion >= 90600 &&
1604  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1605  {
1606  /*
1607  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1608  * they are interesting (and not the original ACLs which were set at
1609  * initdb time, see pg_init_privs).
1610  */
1611  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1612  }
1613  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1614  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1615  {
1616  /* Other system schemas don't get dumped */
1617  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1618  }
1619  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1620  {
1621  /*
1622  * The public schema is a strange beast that sits in a sort of
1623  * no-mans-land between being a system object and a user object.
1624  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1625  * a comment and an indication of ownership. If the owner is the
1626  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1627  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1628  */
1629  nsinfo->create = false;
1630  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1631  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1632  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1634  }
1635  else
1636  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1637 
1638  /*
1639  * In any case, a namespace can be excluded by an exclusion switch
1640  */
1641  if (nsinfo->dobj.dump_contains &&
1642  simple_oid_list_member(&schema_exclude_oids,
1643  nsinfo->dobj.catId.oid))
1644  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1645 
1646  /*
1647  * If the schema belongs to an extension, allow extension membership to
1648  * override the dump decision for the schema itself. However, this does
1649  * not change dump_contains, so this won't change what we do with objects
1650  * within the schema. (If they belong to the extension, they'll get
1651  * suppressed by it, otherwise not.)
1652  */
1653  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1654 }
1655 
1656 /*
1657  * selectDumpableTable: policy-setting subroutine
1658  * Mark a table as to be dumped or not
1659  */
1660 static void
1662 {
1663  if (checkExtensionMembership(&tbinfo->dobj, fout))
1664  return; /* extension membership overrides all else */
1665 
1666  /*
1667  * If specific tables are being dumped, dump just those tables; else, dump
1668  * according to the parent namespace's dump flag.
1669  */
1670  if (table_include_oids.head != NULL)
1671  tbinfo->dobj.dump = simple_oid_list_member(&table_include_oids,
1672  tbinfo->dobj.catId.oid) ?
1674  else
1675  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1676 
1677  /*
1678  * In any case, a table can be excluded by an exclusion switch
1679  */
1680  if (tbinfo->dobj.dump &&
1681  simple_oid_list_member(&table_exclude_oids,
1682  tbinfo->dobj.catId.oid))
1683  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1684 }
1685 
1686 /*
1687  * selectDumpableType: policy-setting subroutine
1688  * Mark a type as to be dumped or not
1689  *
1690  * If it's a table's rowtype or an autogenerated array type, we also apply a
1691  * special type code to facilitate sorting into the desired order. (We don't
1692  * want to consider those to be ordinary types because that would bring tables
1693  * up into the datatype part of the dump order.) We still set the object's
1694  * dump flag; that's not going to cause the dummy type to be dumped, but we
1695  * need it so that casts involving such types will be dumped correctly -- see
1696  * dumpCast. This means the flag should be set the same as for the underlying
1697  * object (the table or base type).
1698  */
1699 static void
1701 {
1702  /* skip complex types, except for standalone composite types */
1703  if (OidIsValid(tyinfo->typrelid) &&
1704  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1705  {
1706  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1707 
1708  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1709  if (tytable != NULL)
1710  tyinfo->dobj.dump = tytable->dobj.dump;
1711  else
1712  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1713  return;
1714  }
1715 
1716  /* skip auto-generated array types */
1717  if (tyinfo->isArray || tyinfo->isMultirange)
1718  {
1719  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1720 
1721  /*
1722  * Fall through to set the dump flag; we assume that the subsequent
1723  * rules will do the same thing as they would for the array's base
1724  * type. (We cannot reliably look up the base type here, since
1725  * getTypes may not have processed it yet.)
1726  */
1727  }
1728 
1729  if (checkExtensionMembership(&tyinfo->dobj, fout))
1730  return; /* extension membership overrides all else */
1731 
1732  /* Dump based on if the contents of the namespace are being dumped */
1733  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1734 }
1735 
1736 /*
1737  * selectDumpableDefaultACL: policy-setting subroutine
1738  * Mark a default ACL as to be dumped or not
1739  *
1740  * For per-schema default ACLs, dump if the schema is to be dumped.
1741  * Otherwise dump if we are dumping "everything". Note that dataOnly
1742  * and aclsSkip are checked separately.
1743  */
1744 static void
1746 {
1747  /* Default ACLs can't be extension members */
1748 
1749  if (dinfo->dobj.namespace)
1750  /* default ACLs are considered part of the namespace */
1751  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1752  else
1753  dinfo->dobj.dump = dopt->include_everything ?
1755 }
1756 
1757 /*
1758  * selectDumpableCast: policy-setting subroutine
1759  * Mark a cast as to be dumped or not
1760  *
1761  * Casts do not belong to any particular namespace (since they haven't got
1762  * names), nor do they have identifiable owners. To distinguish user-defined
1763  * casts from built-in ones, we must resort to checking whether the cast's
1764  * OID is in the range reserved for initdb.
1765  */
1766 static void
1768 {
1769  if (checkExtensionMembership(&cast->dobj, fout))
1770  return; /* extension membership overrides all else */
1771 
1772  /*
1773  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1774  * support ACLs currently.
1775  */
1776  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1777  cast->dobj.dump = DUMP_COMPONENT_NONE;
1778  else
1779  cast->dobj.dump = fout->dopt->include_everything ?
1781 }
1782 
1783 /*
1784  * selectDumpableProcLang: policy-setting subroutine
1785  * Mark a procedural language as to be dumped or not
1786  *
1787  * Procedural languages do not belong to any particular namespace. To
1788  * identify built-in languages, we must resort to checking whether the
1789  * language's OID is in the range reserved for initdb.
1790  */
1791 static void
1793 {
1794  if (checkExtensionMembership(&plang->dobj, fout))
1795  return; /* extension membership overrides all else */
1796 
1797  /*
1798  * Only include procedural languages when we are dumping everything.
1799  *
1800  * For from-initdb procedural languages, only include ACLs, as we do for
1801  * the pg_catalog namespace. We need this because procedural languages do
1802  * not live in any namespace.
1803  */
1804  if (!fout->dopt->include_everything)
1805  plang->dobj.dump = DUMP_COMPONENT_NONE;
1806  else
1807  {
1808  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1809  plang->dobj.dump = fout->remoteVersion < 90600 ?
1811  else
1812  plang->dobj.dump = DUMP_COMPONENT_ALL;
1813  }
1814 }
1815 
1816 /*
1817  * selectDumpableAccessMethod: policy-setting subroutine
1818  * Mark an access method as to be dumped or not
1819  *
1820  * Access methods do not belong to any particular namespace. To identify
1821  * built-in access methods, we must resort to checking whether the
1822  * method's OID is in the range reserved for initdb.
1823  */
1824 static void
1826 {
1827  if (checkExtensionMembership(&method->dobj, fout))
1828  return; /* extension membership overrides all else */
1829 
1830  /*
1831  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1832  * they do not support ACLs currently.
1833  */
1834  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1835  method->dobj.dump = DUMP_COMPONENT_NONE;
1836  else
1837  method->dobj.dump = fout->dopt->include_everything ?
1839 }
1840 
1841 /*
1842  * selectDumpableExtension: policy-setting subroutine
1843  * Mark an extension as to be dumped or not
1844  *
1845  * Built-in extensions should be skipped except for checking ACLs, since we
1846  * assume those will already be installed in the target database. We identify
1847  * such extensions by their having OIDs in the range reserved for initdb.
1848  * We dump all user-added extensions by default. No extensions are dumped
1849  * if include_everything is false (i.e., a --schema or --table switch was
1850  * given), except if --extension specifies a list of extensions to dump.
1851  */
1852 static void
1854 {
1855  /*
1856  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
1857  * change permissions on their member objects, if they wish to, and have
1858  * those changes preserved.
1859  */
1860  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1861  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
1862  else
1863  {
1864  /* check if there is a list of extensions to dump */
1865  if (extension_include_oids.head != NULL)
1866  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1867  simple_oid_list_member(&extension_include_oids,
1868  extinfo->dobj.catId.oid) ?
1870  else
1871  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1872  dopt->include_everything ?
1874  }
1875 }
1876 
1877 /*
1878  * selectDumpablePublicationTable: policy-setting subroutine
1879  * Mark a publication table as to be dumped or not
1880  *
1881  * Publication tables have schemas, but those are ignored in decision making,
1882  * because publications are only dumped when we are dumping everything.
1883  */
1884 static void
1886 {
1887  if (checkExtensionMembership(dobj, fout))
1888  return; /* extension membership overrides all else */
1889 
1890  dobj->dump = fout->dopt->include_everything ?
1892 }
1893 
1894 /*
1895  * selectDumpableObject: policy-setting subroutine
1896  * Mark a generic dumpable object as to be dumped or not
1897  *
1898  * Use this only for object types without a special-case routine above.
1899  */
1900 static void
1902 {
1903  if (checkExtensionMembership(dobj, fout))
1904  return; /* extension membership overrides all else */
1905 
1906  /*
1907  * Default policy is to dump if parent namespace is dumpable, or for
1908  * non-namespace-associated items, dump if we're dumping "everything".
1909  */
1910  if (dobj->namespace)
1911  dobj->dump = dobj->namespace->dobj.dump_contains;
1912  else
1913  dobj->dump = fout->dopt->include_everything ?
1915 }
1916 
1917 /*
1918  * Dump a table's contents for loading using the COPY command
1919  * - this routine is called by the Archiver when it wants the table
1920  * to be dumped.
1921  */
1922 static int
1923 dumpTableData_copy(Archive *fout, const void *dcontext)
1924 {
1925  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
1926  TableInfo *tbinfo = tdinfo->tdtable;
1927  const char *classname = tbinfo->dobj.name;
1929 
1930  /*
1931  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
1932  * which uses it already.
1933  */
1934  PQExpBuffer clistBuf = createPQExpBuffer();
1935  PGconn *conn = GetConnection(fout);
1936  PGresult *res;
1937  int ret;
1938  char *copybuf;
1939  const char *column_list;
1940 
1941  pg_log_info("dumping contents of table \"%s.%s\"",
1942  tbinfo->dobj.namespace->dobj.name, classname);
1943 
1944  /*
1945  * Specify the column list explicitly so that we have no possibility of
1946  * retrieving data in the wrong column order. (The default column
1947  * ordering of COPY will not be what we want in certain corner cases
1948  * involving ADD COLUMN and inheritance.)
1949  */
1950  column_list = fmtCopyColumnList(tbinfo, clistBuf);
1951 
1952  /*
1953  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
1954  * a filter condition was specified. For other cases a simple COPY
1955  * suffices.
1956  */
1957  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
1958  {
1959  /* Note: this syntax is only supported in 8.2 and up */
1960  appendPQExpBufferStr(q, "COPY (SELECT ");
1961  /* klugery to get rid of parens in column list */
1962  if (strlen(column_list) > 2)
1963  {
1964  appendPQExpBufferStr(q, column_list + 1);
1965  q->data[q->len - 1] = ' ';
1966  }
1967  else
1968  appendPQExpBufferStr(q, "* ");
1969 
1970  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
1971  fmtQualifiedDumpable(tbinfo),
1972  tdinfo->filtercond ? tdinfo->filtercond : "");
1973  }
1974  else
1975  {
1976  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
1977  fmtQualifiedDumpable(tbinfo),
1978  column_list);
1979  }
1980  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
1981  PQclear(res);
1982  destroyPQExpBuffer(clistBuf);
1983 
1984  for (;;)
1985  {
1986  ret = PQgetCopyData(conn, &copybuf, 0);
1987 
1988  if (ret < 0)
1989  break; /* done or error */
1990 
1991  if (copybuf)
1992  {
1993  WriteData(fout, copybuf, ret);
1994  PQfreemem(copybuf);
1995  }
1996 
1997  /* ----------
1998  * THROTTLE:
1999  *
2000  * There was considerable discussion in late July, 2000 regarding
2001  * slowing down pg_dump when backing up large tables. Users with both
2002  * slow & fast (multi-processor) machines experienced performance
2003  * degradation when doing a backup.
2004  *
2005  * Initial attempts based on sleeping for a number of ms for each ms
2006  * of work were deemed too complex, then a simple 'sleep in each loop'
2007  * implementation was suggested. The latter failed because the loop
2008  * was too tight. Finally, the following was implemented:
2009  *
2010  * If throttle is non-zero, then
2011  * See how long since the last sleep.
2012  * Work out how long to sleep (based on ratio).
2013  * If sleep is more than 100ms, then
2014  * sleep
2015  * reset timer
2016  * EndIf
2017  * EndIf
2018  *
2019  * where the throttle value was the number of ms to sleep per ms of
2020  * work. The calculation was done in each loop.
2021  *
2022  * Most of the hard work is done in the backend, and this solution
2023  * still did not work particularly well: on slow machines, the ratio
2024  * was 50:1, and on medium paced machines, 1:1, and on fast
2025  * multi-processor machines, it had little or no effect, for reasons
2026  * that were unclear.
2027  *
2028  * Further discussion ensued, and the proposal was dropped.
2029  *
2030  * For those people who want this feature, it can be implemented using
2031  * gettimeofday in each loop, calculating the time since last sleep,
2032  * multiplying that by the sleep ratio, then if the result is more
2033  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2034  * function to sleep for a subsecond period ie.
2035  *
2036  * select(0, NULL, NULL, NULL, &tvi);
2037  *
2038  * This will return after the interval specified in the structure tvi.
2039  * Finally, call gettimeofday again to save the 'last sleep time'.
2040  * ----------
2041  */
2042  }
2043  archprintf(fout, "\\.\n\n\n");
2044 
2045  if (ret == -2)
2046  {
2047  /* copy data transfer failed */
2048  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2049  pg_log_error("Error message from server: %s", PQerrorMessage(conn));
2050  pg_log_error("The command was: %s", q->data);
2051  exit_nicely(1);
2052  }
2053 
2054  /* Check command status and return to normal libpq state */
2055  res = PQgetResult(conn);
2056  if (PQresultStatus(res) != PGRES_COMMAND_OK)
2057  {
2058  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2059  pg_log_error("Error message from server: %s", PQerrorMessage(conn));
2060  pg_log_error("The command was: %s", q->data);
2061  exit_nicely(1);
2062  }
2063  PQclear(res);
2064 
2065  /* Do this to ensure we've pumped libpq back to idle state */
2066  if (PQgetResult(conn) != NULL)
2067  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2068  classname);
2069 
2070  destroyPQExpBuffer(q);
2071  return 1;
2072 }
2073 
2074 /*
2075  * Dump table data using INSERT commands.
2076  *
2077  * Caution: when we restore from an archive file direct to database, the
2078  * INSERT commands emitted by this function have to be parsed by
2079  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2080  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2081  */
2082 static int
2083 dumpTableData_insert(Archive *fout, const void *dcontext)
2084 {
2085  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2086  TableInfo *tbinfo = tdinfo->tdtable;
2087  DumpOptions *dopt = fout->dopt;
2089  PQExpBuffer insertStmt = NULL;
2090  PGresult *res;
2091  int nfields;
2092  int rows_per_statement = dopt->dump_inserts;
2093  int rows_this_statement = 0;
2094 
2095  appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
2096  "SELECT * FROM ONLY %s",
2097  fmtQualifiedDumpable(tbinfo));
2098  if (tdinfo->filtercond)
2099  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2100 
2101  ExecuteSqlStatement(fout, q->data);
2102 
2103  while (1)
2104  {
2105  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2106  PGRES_TUPLES_OK);
2107  nfields = PQnfields(res);
2108 
2109  /*
2110  * First time through, we build as much of the INSERT statement as
2111  * possible in "insertStmt", which we can then just print for each
2112  * statement. If the table happens to have zero columns then this will
2113  * be a complete statement, otherwise it will end in "VALUES" and be
2114  * ready to have the row's column values printed.
2115  */
2116  if (insertStmt == NULL)
2117  {
2118  TableInfo *targettab;
2119 
2120  insertStmt = createPQExpBuffer();
2121 
2122  /*
2123  * When load-via-partition-root is set, get the root table name
2124  * for the partition table, so that we can reload data through the
2125  * root table.
2126  */
2127  if (dopt->load_via_partition_root && tbinfo->ispartition)
2128  targettab = getRootTableInfo(tbinfo);
2129  else
2130  targettab = tbinfo;
2131 
2132  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2133  fmtQualifiedDumpable(targettab));
2134 
2135  /* corner case for zero-column table */
2136  if (nfields == 0)
2137  {
2138  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2139  }
2140  else
2141  {
2142  /* append the list of column names if required */
2143  if (dopt->column_inserts)
2144  {
2145  appendPQExpBufferChar(insertStmt, '(');
2146  for (int field = 0; field < nfields; field++)
2147  {
2148  if (field > 0)
2149  appendPQExpBufferStr(insertStmt, ", ");
2150  appendPQExpBufferStr(insertStmt,
2151  fmtId(PQfname(res, field)));
2152  }
2153  appendPQExpBufferStr(insertStmt, ") ");
2154  }
2155 
2156  if (tbinfo->needs_override)
2157  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2158 
2159  appendPQExpBufferStr(insertStmt, "VALUES");
2160  }
2161  }
2162 
2163  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2164  {
2165  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2166  if (rows_this_statement == 0)
2167  archputs(insertStmt->data, fout);
2168 
2169  /*
2170  * If it is zero-column table then we've already written the
2171  * complete statement, which will mean we've disobeyed
2172  * --rows-per-insert when it's set greater than 1. We do support
2173  * a way to make this multi-row with: SELECT UNION ALL SELECT
2174  * UNION ALL ... but that's non-standard so we should avoid it
2175  * given that using INSERTs is mostly only ever needed for
2176  * cross-database exports.
2177  */
2178  if (nfields == 0)
2179  continue;
2180 
2181  /* Emit a row heading */
2182  if (rows_per_statement == 1)
2183  archputs(" (", fout);
2184  else if (rows_this_statement > 0)
2185  archputs(",\n\t(", fout);
2186  else
2187  archputs("\n\t(", fout);
2188 
2189  for (int field = 0; field < nfields; field++)
2190  {
2191  if (field > 0)
2192  archputs(", ", fout);
2193  if (tbinfo->attgenerated[field])
2194  {
2195  archputs("DEFAULT", fout);
2196  continue;
2197  }
2198  if (PQgetisnull(res, tuple, field))
2199  {
2200  archputs("NULL", fout);
2201  continue;
2202  }
2203 
2204  /* XXX This code is partially duplicated in ruleutils.c */
2205  switch (PQftype(res, field))
2206  {
2207  case INT2OID:
2208  case INT4OID:
2209  case INT8OID:
2210  case OIDOID:
2211  case FLOAT4OID:
2212  case FLOAT8OID:
2213  case NUMERICOID:
2214  {
2215  /*
2216  * These types are printed without quotes unless
2217  * they contain values that aren't accepted by the
2218  * scanner unquoted (e.g., 'NaN'). Note that
2219  * strtod() and friends might accept NaN, so we
2220  * can't use that to test.
2221  *
2222  * In reality we only need to defend against
2223  * infinity and NaN, so we need not get too crazy
2224  * about pattern matching here.
2225  */
2226  const char *s = PQgetvalue(res, tuple, field);
2227 
2228  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2229  archputs(s, fout);
2230  else
2231  archprintf(fout, "'%s'", s);
2232  }
2233  break;
2234 
2235  case BITOID:
2236  case VARBITOID:
2237  archprintf(fout, "B'%s'",
2238  PQgetvalue(res, tuple, field));
2239  break;
2240 
2241  case BOOLOID:
2242  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2243  archputs("true", fout);
2244  else
2245  archputs("false", fout);
2246  break;
2247 
2248  default:
2249  /* All other types are printed as string literals. */
2250  resetPQExpBuffer(q);
2252  PQgetvalue(res, tuple, field),
2253  fout);
2254  archputs(q->data, fout);
2255  break;
2256  }
2257  }
2258 
2259  /* Terminate the row ... */
2260  archputs(")", fout);
2261 
2262  /* ... and the statement, if the target no. of rows is reached */
2263  if (++rows_this_statement >= rows_per_statement)
2264  {
2265  if (dopt->do_nothing)
2266  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2267  else
2268  archputs(";\n", fout);
2269  /* Reset the row counter */
2270  rows_this_statement = 0;
2271  }
2272  }
2273 
2274  if (PQntuples(res) <= 0)
2275  {
2276  PQclear(res);
2277  break;
2278  }
2279  PQclear(res);
2280  }
2281 
2282  /* Terminate any statements that didn't make the row count. */
2283  if (rows_this_statement > 0)
2284  {
2285  if (dopt->do_nothing)
2286  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2287  else
2288  archputs(";\n", fout);
2289  }
2290 
2291  archputs("\n\n", fout);
2292 
2293  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2294 
2295  destroyPQExpBuffer(q);
2296  if (insertStmt != NULL)
2297  destroyPQExpBuffer(insertStmt);
2298 
2299  return 1;
2300 }
2301 
2302 /*
2303  * getRootTableInfo:
2304  * get the root TableInfo for the given partition table.
2305  */
2306 static TableInfo *
2308 {
2309  TableInfo *parentTbinfo;
2310 
2311  Assert(tbinfo->ispartition);
2312  Assert(tbinfo->numParents == 1);
2313 
2314  parentTbinfo = tbinfo->parents[0];
2315  while (parentTbinfo->ispartition)
2316  {
2317  Assert(parentTbinfo->numParents == 1);
2318  parentTbinfo = parentTbinfo->parents[0];
2319  }
2320 
2321  return parentTbinfo;
2322 }
2323 
2324 /*
2325  * dumpTableData -
2326  * dump the contents of a single table
2327  *
2328  * Actually, this just makes an ArchiveEntry for the table contents.
2329  */
2330 static void
2331 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2332 {
2333  DumpOptions *dopt = fout->dopt;
2334  TableInfo *tbinfo = tdinfo->tdtable;
2335  PQExpBuffer copyBuf = createPQExpBuffer();
2336  PQExpBuffer clistBuf = createPQExpBuffer();
2337  DataDumperPtr dumpFn;
2338  char *copyStmt;
2339  const char *copyFrom;
2340 
2341  /* We had better have loaded per-column details about this table */
2342  Assert(tbinfo->interesting);
2343 
2344  if (dopt->dump_inserts == 0)
2345  {
2346  /* Dump/restore using COPY */
2347  dumpFn = dumpTableData_copy;
2348 
2349  /*
2350  * When load-via-partition-root is set, get the root table name for
2351  * the partition table, so that we can reload data through the root
2352  * table.
2353  */
2354  if (dopt->load_via_partition_root && tbinfo->ispartition)
2355  {
2356  TableInfo *parentTbinfo;
2357 
2358  parentTbinfo = getRootTableInfo(tbinfo);
2359  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2360  }
2361  else
2362  copyFrom = fmtQualifiedDumpable(tbinfo);
2363 
2364  /* must use 2 steps here 'cause fmtId is nonreentrant */
2365  appendPQExpBuffer(copyBuf, "COPY %s ",
2366  copyFrom);
2367  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2368  fmtCopyColumnList(tbinfo, clistBuf));
2369  copyStmt = copyBuf->data;
2370  }
2371  else
2372  {
2373  /* Restore using INSERT */
2374  dumpFn = dumpTableData_insert;
2375  copyStmt = NULL;
2376  }
2377 
2378  /*
2379  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2380  * dependency on its table as "special" and pass it to ArchiveEntry now.
2381  * See comments for BuildArchiveDependencies.
2382  */
2383  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2384  {
2385  TocEntry *te;
2386 
2387  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2388  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2389  .namespace = tbinfo->dobj.namespace->dobj.name,
2390  .owner = tbinfo->rolname,
2391  .description = "TABLE DATA",
2392  .section = SECTION_DATA,
2393  .copyStmt = copyStmt,
2394  .deps = &(tbinfo->dobj.dumpId),
2395  .nDeps = 1,
2396  .dumpFn = dumpFn,
2397  .dumpArg = tdinfo));
2398 
2399  /*
2400  * Set the TocEntry's dataLength in case we are doing a parallel dump
2401  * and want to order dump jobs by table size. We choose to measure
2402  * dataLength in table pages during dump, so no scaling is needed.
2403  * However, relpages is declared as "integer" in pg_class, and hence
2404  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2405  * Cast so that we get the right interpretation of table sizes
2406  * exceeding INT_MAX pages.
2407  */
2408  te->dataLength = (BlockNumber) tbinfo->relpages;
2409  }
2410 
2411  destroyPQExpBuffer(copyBuf);
2412  destroyPQExpBuffer(clistBuf);
2413 }
2414 
2415 /*
2416  * refreshMatViewData -
2417  * load or refresh the contents of a single materialized view
2418  *
2419  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2420  * statement.
2421  */
2422 static void
2424 {
2425  TableInfo *tbinfo = tdinfo->tdtable;
2426  PQExpBuffer q;
2427 
2428  /* If the materialized view is not flagged as populated, skip this. */
2429  if (!tbinfo->relispopulated)
2430  return;
2431 
2432  q = createPQExpBuffer();
2433 
2434  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2435  fmtQualifiedDumpable(tbinfo));
2436 
2437  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2438  ArchiveEntry(fout,
2439  tdinfo->dobj.catId, /* catalog ID */
2440  tdinfo->dobj.dumpId, /* dump ID */
2441  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2442  .namespace = tbinfo->dobj.namespace->dobj.name,
2443  .owner = tbinfo->rolname,
2444  .description = "MATERIALIZED VIEW DATA",
2445  .section = SECTION_POST_DATA,
2446  .createStmt = q->data,
2447  .deps = tdinfo->dobj.dependencies,
2448  .nDeps = tdinfo->dobj.nDeps));
2449 
2450  destroyPQExpBuffer(q);
2451 }
2452 
2453 /*
2454  * getTableData -
2455  * set up dumpable objects representing the contents of tables
2456  */
2457 static void
2458 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2459 {
2460  int i;
2461 
2462  for (i = 0; i < numTables; i++)
2463  {
2464  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2465  (!relkind || tblinfo[i].relkind == relkind))
2466  makeTableDataInfo(dopt, &(tblinfo[i]));
2467  }
2468 }
2469 
2470 /*
2471  * Make a dumpable object for the data of this specific table
2472  *
2473  * Note: we make a TableDataInfo if and only if we are going to dump the
2474  * table data; the "dump" flag in such objects isn't used.
2475  */
2476 static void
2478 {
2479  TableDataInfo *tdinfo;
2480 
2481  /*
2482  * Nothing to do if we already decided to dump the table. This will
2483  * happen for "config" tables.
2484  */
2485  if (tbinfo->dataObj != NULL)
2486  return;
2487 
2488  /* Skip VIEWs (no data to dump) */
2489  if (tbinfo->relkind == RELKIND_VIEW)
2490  return;
2491  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2492  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2493  (foreign_servers_include_oids.head == NULL ||
2494  !simple_oid_list_member(&foreign_servers_include_oids,
2495  tbinfo->foreign_server)))
2496  return;
2497  /* Skip partitioned tables (data in partitions) */
2498  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2499  return;
2500 
2501  /* Don't dump data in unlogged tables, if so requested */
2502  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2503  dopt->no_unlogged_table_data)
2504  return;
2505 
2506  /* Check that the data is not explicitly excluded */
2507  if (simple_oid_list_member(&tabledata_exclude_oids,
2508  tbinfo->dobj.catId.oid))
2509  return;
2510 
2511  /* OK, let's dump it */
2512  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2513 
2514  if (tbinfo->relkind == RELKIND_MATVIEW)
2515  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2516  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2517  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2518  else
2519  tdinfo->dobj.objType = DO_TABLE_DATA;
2520 
2521  /*
2522  * Note: use tableoid 0 so that this object won't be mistaken for
2523  * something that pg_depend entries apply to.
2524  */
2525  tdinfo->dobj.catId.tableoid = 0;
2526  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2527  AssignDumpId(&tdinfo->dobj);
2528  tdinfo->dobj.name = tbinfo->dobj.name;
2529  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2530  tdinfo->tdtable = tbinfo;
2531  tdinfo->filtercond = NULL; /* might get set later */
2532  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2533 
2534  tbinfo->dataObj = tdinfo;
2535 
2536  /* Make sure that we'll collect per-column info for this table. */
2537  tbinfo->interesting = true;
2538 }
2539 
2540 /*
2541  * The refresh for a materialized view must be dependent on the refresh for
2542  * any materialized view that this one is dependent on.
2543  *
2544  * This must be called after all the objects are created, but before they are
2545  * sorted.
2546  */
2547 static void
2549 {
2550  PQExpBuffer query;
2551  PGresult *res;
2552  int ntups,
2553  i;
2554  int i_classid,
2555  i_objid,
2556  i_refobjid;
2557 
2558  /* No Mat Views before 9.3. */
2559  if (fout->remoteVersion < 90300)
2560  return;
2561 
2562  query = createPQExpBuffer();
2563 
2564  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2565  "( "
2566  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2567  "FROM pg_depend d1 "
2568  "JOIN pg_class c1 ON c1.oid = d1.objid "
2569  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2570  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2571  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2572  "AND d2.objid = r1.oid "
2573  "AND d2.refobjid <> d1.objid "
2574  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2575  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2576  CppAsString2(RELKIND_VIEW) ") "
2577  "WHERE d1.classid = 'pg_class'::regclass "
2578  "UNION "
2579  "SELECT w.objid, d3.refobjid, c3.relkind "
2580  "FROM w "
2581  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2582  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2583  "AND d3.objid = r3.oid "
2584  "AND d3.refobjid <> w.refobjid "
2585  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2586  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2587  CppAsString2(RELKIND_VIEW) ") "
2588  ") "
2589  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2590  "FROM w "
2591  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2592 
2593  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2594 
2595  ntups = PQntuples(res);
2596 
2597  i_classid = PQfnumber(res, "classid");
2598  i_objid = PQfnumber(res, "objid");
2599  i_refobjid = PQfnumber(res, "refobjid");
2600 
2601  for (i = 0; i < ntups; i++)
2602  {
2603  CatalogId objId;
2604  CatalogId refobjId;
2605  DumpableObject *dobj;
2606  DumpableObject *refdobj;
2607  TableInfo *tbinfo;
2608  TableInfo *reftbinfo;
2609 
2610  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2611  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2612  refobjId.tableoid = objId.tableoid;
2613  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2614 
2615  dobj = findObjectByCatalogId(objId);
2616  if (dobj == NULL)
2617  continue;
2618 
2619  Assert(dobj->objType == DO_TABLE);
2620  tbinfo = (TableInfo *) dobj;
2621  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2622  dobj = (DumpableObject *) tbinfo->dataObj;
2623  if (dobj == NULL)
2624  continue;
2625  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2626 
2627  refdobj = findObjectByCatalogId(refobjId);
2628  if (refdobj == NULL)
2629  continue;
2630 
2631  Assert(refdobj->objType == DO_TABLE);
2632  reftbinfo = (TableInfo *) refdobj;
2633  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2634  refdobj = (DumpableObject *) reftbinfo->dataObj;
2635  if (refdobj == NULL)
2636  continue;
2637  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2638 
2639  addObjectDependency(dobj, refdobj->dumpId);
2640 
2641  if (!reftbinfo->relispopulated)
2642  tbinfo->relispopulated = false;
2643  }
2644 
2645  PQclear(res);
2646 
2647  destroyPQExpBuffer(query);
2648 }
2649 
2650 /*
2651  * getTableDataFKConstraints -
2652  * add dump-order dependencies reflecting foreign key constraints
2653  *
2654  * This code is executed only in a data-only dump --- in schema+data dumps
2655  * we handle foreign key issues by not creating the FK constraints until
2656  * after the data is loaded. In a data-only dump, however, we want to
2657  * order the table data objects in such a way that a table's referenced
2658  * tables are restored first. (In the presence of circular references or
2659  * self-references this may be impossible; we'll detect and complain about
2660  * that during the dependency sorting step.)
2661  */
2662 static void
2664 {
2665  DumpableObject **dobjs;
2666  int numObjs;
2667  int i;
2668 
2669  /* Search through all the dumpable objects for FK constraints */
2670  getDumpableObjects(&dobjs, &numObjs);
2671  for (i = 0; i < numObjs; i++)
2672  {
2673  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2674  {
2675  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2676  TableInfo *ftable;
2677 
2678  /* Not interesting unless both tables are to be dumped */
2679  if (cinfo->contable == NULL ||
2680  cinfo->contable->dataObj == NULL)
2681  continue;
2682  ftable = findTableByOid(cinfo->confrelid);
2683  if (ftable == NULL ||
2684  ftable->dataObj == NULL)
2685  continue;
2686 
2687  /*
2688  * Okay, make referencing table's TABLE_DATA object depend on the
2689  * referenced table's TABLE_DATA object.
2690  */
2692  ftable->dataObj->dobj.dumpId);
2693  }
2694  }
2695  free(dobjs);
2696 }
2697 
2698 
2699 /*
2700  * guessConstraintInheritance:
2701  * In pre-8.4 databases, we can't tell for certain which constraints
2702  * are inherited. We assume a CHECK constraint is inherited if its name
2703  * matches the name of any constraint in the parent. Originally this code
2704  * tried to compare the expression texts, but that can fail for various
2705  * reasons --- for example, if the parent and child tables are in different
2706  * schemas, reverse-listing of function calls may produce different text
2707  * (schema-qualified or not) depending on search path.
2708  *
2709  * In 8.4 and up we can rely on the conislocal field to decide which
2710  * constraints must be dumped; much safer.
2711  *
2712  * This function assumes all conislocal flags were initialized to true.
2713  * It clears the flag on anything that seems to be inherited.
2714  */
2715 static void
2717 {
2718  int i,
2719  j,
2720  k;
2721 
2722  for (i = 0; i < numTables; i++)
2723  {
2724  TableInfo *tbinfo = &(tblinfo[i]);
2725  int numParents;
2726  TableInfo **parents;
2727  TableInfo *parent;
2728 
2729  /* Sequences and views never have parents */
2730  if (tbinfo->relkind == RELKIND_SEQUENCE ||
2731  tbinfo->relkind == RELKIND_VIEW)
2732  continue;
2733 
2734  /* Don't bother computing anything for non-target tables, either */
2735  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
2736  continue;
2737 
2738  numParents = tbinfo->numParents;
2739  parents = tbinfo->parents;
2740 
2741  if (numParents == 0)
2742  continue; /* nothing to see here, move along */
2743 
2744  /* scan for inherited CHECK constraints */
2745  for (j = 0; j < tbinfo->ncheck; j++)
2746  {
2747  ConstraintInfo *constr;
2748 
2749  constr = &(tbinfo->checkexprs[j]);
2750 
2751  for (k = 0; k < numParents; k++)
2752  {
2753  int l;
2754 
2755  parent = parents[k];
2756  for (l = 0; l < parent->ncheck; l++)
2757  {
2758  ConstraintInfo *pconstr = &(parent->checkexprs[l]);
2759 
2760  if (strcmp(pconstr->dobj.name, constr->dobj.name) == 0)
2761  {
2762  constr->conislocal = false;
2763  break;
2764  }
2765  }
2766  if (!constr->conislocal)
2767  break;
2768  }
2769  }
2770  }
2771 }
2772 
2773 
2774 /*
2775  * dumpDatabase:
2776  * dump the database definition
2777  */
2778 static void
2780 {
2781  DumpOptions *dopt = fout->dopt;
2782  PQExpBuffer dbQry = createPQExpBuffer();
2783  PQExpBuffer delQry = createPQExpBuffer();
2784  PQExpBuffer creaQry = createPQExpBuffer();
2785  PQExpBuffer labelq = createPQExpBuffer();
2786  PGconn *conn = GetConnection(fout);
2787  PGresult *res;
2788  int i_tableoid,
2789  i_oid,
2790  i_datname,
2791  i_dba,
2792  i_encoding,
2793  i_collate,
2794  i_ctype,
2795  i_frozenxid,
2796  i_minmxid,
2797  i_datacl,
2798  i_rdatacl,
2799  i_datistemplate,
2800  i_datconnlimit,
2801  i_tablespace;
2802  CatalogId dbCatId;
2803  DumpId dbDumpId;
2804  const char *datname,
2805  *dba,
2806  *encoding,
2807  *collate,
2808  *ctype,
2809  *datacl,
2810  *rdatacl,
2811  *datistemplate,
2812  *datconnlimit,
2813  *tablespace;
2814  uint32 frozenxid,
2815  minmxid;
2816  char *qdatname;
2817 
2818  pg_log_info("saving database definition");
2819 
2820  /*
2821  * Fetch the database-level properties for this database.
2822  *
2823  * The order in which privileges are in the ACL string (the order they
2824  * have been GRANT'd in, which the backend maintains) must be preserved to
2825  * ensure that GRANTs WITH GRANT OPTION and subsequent GRANTs based on
2826  * those are dumped in the correct order. Note that initial privileges
2827  * (pg_init_privs) are not supported on databases, so this logic cannot
2828  * make use of buildACLQueries().
2829  */
2830  if (fout->remoteVersion >= 90600)
2831  {
2832  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2833  "(%s datdba) AS dba, "
2834  "pg_encoding_to_char(encoding) AS encoding, "
2835  "datcollate, datctype, datfrozenxid, datminmxid, "
2836  "(SELECT array_agg(acl ORDER BY row_n) FROM "
2837  " (SELECT acl, row_n FROM "
2838  " unnest(coalesce(datacl,acldefault('d',datdba))) "
2839  " WITH ORDINALITY AS perm(acl,row_n) "
2840  " WHERE NOT EXISTS ( "
2841  " SELECT 1 "
2842  " FROM unnest(acldefault('d',datdba)) "
2843  " AS init(init_acl) "
2844  " WHERE acl = init_acl)) AS datacls) "
2845  " AS datacl, "
2846  "(SELECT array_agg(acl ORDER BY row_n) FROM "
2847  " (SELECT acl, row_n FROM "
2848  " unnest(acldefault('d',datdba)) "
2849  " WITH ORDINALITY AS initp(acl,row_n) "
2850  " WHERE NOT EXISTS ( "
2851  " SELECT 1 "
2852  " FROM unnest(coalesce(datacl,acldefault('d',datdba))) "
2853  " AS permp(orig_acl) "
2854  " WHERE acl = orig_acl)) AS rdatacls) "
2855  " AS rdatacl, "
2856  "datistemplate, datconnlimit, "
2857  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2858  "shobj_description(oid, 'pg_database') AS description "
2859 
2860  "FROM pg_database "
2861  "WHERE datname = current_database()",
2863  }
2864  else if (fout->remoteVersion >= 90300)
2865  {
2866  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2867  "(%s datdba) AS dba, "
2868  "pg_encoding_to_char(encoding) AS encoding, "
2869  "datcollate, datctype, datfrozenxid, datminmxid, "
2870  "datacl, '' as rdatacl, datistemplate, datconnlimit, "
2871  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2872  "shobj_description(oid, 'pg_database') AS description "
2873 
2874  "FROM pg_database "
2875  "WHERE datname = current_database()",
2877  }
2878  else if (fout->remoteVersion >= 80400)
2879  {
2880  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2881  "(%s datdba) AS dba, "
2882  "pg_encoding_to_char(encoding) AS encoding, "
2883  "datcollate, datctype, datfrozenxid, 0 AS datminmxid, "
2884  "datacl, '' as rdatacl, datistemplate, datconnlimit, "
2885  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2886  "shobj_description(oid, 'pg_database') AS description "
2887 
2888  "FROM pg_database "
2889  "WHERE datname = current_database()",
2891  }
2892  else if (fout->remoteVersion >= 80200)
2893  {
2894  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2895  "(%s datdba) AS dba, "
2896  "pg_encoding_to_char(encoding) AS encoding, "
2897  "NULL AS datcollate, NULL AS datctype, datfrozenxid, 0 AS datminmxid, "
2898  "datacl, '' as rdatacl, datistemplate, datconnlimit, "
2899  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2900  "shobj_description(oid, 'pg_database') AS description "
2901 
2902  "FROM pg_database "
2903  "WHERE datname = current_database()",
2905  }
2906  else
2907  {
2908  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2909  "(%s datdba) AS dba, "
2910  "pg_encoding_to_char(encoding) AS encoding, "
2911  "NULL AS datcollate, NULL AS datctype, datfrozenxid, 0 AS datminmxid, "
2912  "datacl, '' as rdatacl, datistemplate, "
2913  "-1 as datconnlimit, "
2914  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace "
2915  "FROM pg_database "
2916  "WHERE datname = current_database()",
2918  }
2919 
2920  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
2921 
2922  i_tableoid = PQfnumber(res, "tableoid");
2923  i_oid = PQfnumber(res, "oid");
2924  i_datname = PQfnumber(res, "datname");
2925  i_dba = PQfnumber(res, "dba");
2926  i_encoding = PQfnumber(res, "encoding");
2927  i_collate = PQfnumber(res, "datcollate");
2928  i_ctype = PQfnumber(res, "datctype");
2929  i_frozenxid = PQfnumber(res, "datfrozenxid");
2930  i_minmxid = PQfnumber(res, "datminmxid");
2931  i_datacl = PQfnumber(res, "datacl");
2932  i_rdatacl = PQfnumber(res, "rdatacl");
2933  i_datistemplate = PQfnumber(res, "datistemplate");
2934  i_datconnlimit = PQfnumber(res, "datconnlimit");
2935  i_tablespace = PQfnumber(res, "tablespace");
2936 
2937  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
2938  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
2939  datname = PQgetvalue(res, 0, i_datname);
2940  dba = PQgetvalue(res, 0, i_dba);
2941  encoding = PQgetvalue(res, 0, i_encoding);
2942  collate = PQgetvalue(res, 0, i_collate);
2943  ctype = PQgetvalue(res, 0, i_ctype);
2944  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
2945  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
2946  datacl = PQgetvalue(res, 0, i_datacl);
2947  rdatacl = PQgetvalue(res, 0, i_rdatacl);
2948  datistemplate = PQgetvalue(res, 0, i_datistemplate);
2949  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
2950  tablespace = PQgetvalue(res, 0, i_tablespace);
2951 
2952  qdatname = pg_strdup(fmtId(datname));
2953 
2954  /*
2955  * Prepare the CREATE DATABASE command. We must specify encoding, locale,
2956  * and tablespace since those can't be altered later. Other DB properties
2957  * are left to the DATABASE PROPERTIES entry, so that they can be applied
2958  * after reconnecting to the target DB.
2959  */
2960  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
2961  qdatname);
2962  if (strlen(encoding) > 0)
2963  {
2964  appendPQExpBufferStr(creaQry, " ENCODING = ");
2965  appendStringLiteralAH(creaQry, encoding, fout);
2966  }
2967  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
2968  {
2969  appendPQExpBufferStr(creaQry, " LOCALE = ");
2970  appendStringLiteralAH(creaQry, collate, fout);
2971  }
2972  else
2973  {
2974  if (strlen(collate) > 0)
2975  {
2976  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
2977  appendStringLiteralAH(creaQry, collate, fout);
2978  }
2979  if (strlen(ctype) > 0)
2980  {
2981  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
2982  appendStringLiteralAH(creaQry, ctype, fout);
2983  }
2984  }
2985 
2986  /*
2987  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
2988  * thing; the decision whether to specify a tablespace should be left till
2989  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
2990  * label the DATABASE entry with the tablespace and let the normal
2991  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
2992  * attention to default_tablespace, so that won't work.
2993  */
2994  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
2995  !dopt->outputNoTablespaces)
2996  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
2997  fmtId(tablespace));
2998  appendPQExpBufferStr(creaQry, ";\n");
2999 
3000  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3001  qdatname);
3002 
3003  dbDumpId = createDumpId();
3004 
3005  ArchiveEntry(fout,
3006  dbCatId, /* catalog ID */
3007  dbDumpId, /* dump ID */
3008  ARCHIVE_OPTS(.tag = datname,
3009  .owner = dba,
3010  .description = "DATABASE",
3011  .section = SECTION_PRE_DATA,
3012  .createStmt = creaQry->data,
3013  .dropStmt = delQry->data));
3014 
3015  /* Compute correct tag for archive entry */
3016  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3017 
3018  /* Dump DB comment if any */
3019  if (fout->remoteVersion >= 80200)
3020  {
3021  /*
3022  * 8.2 and up keep comments on shared objects in a shared table, so we
3023  * cannot use the dumpComment() code used for other database objects.
3024  * Be careful that the ArchiveEntry parameters match that function.
3025  */
3026  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3027 
3028  if (comment && *comment && !dopt->no_comments)
3029  {
3030  resetPQExpBuffer(dbQry);
3031 
3032  /*
3033  * Generates warning when loaded into a differently-named
3034  * database.
3035  */
3036  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3037  appendStringLiteralAH(dbQry, comment, fout);
3038  appendPQExpBufferStr(dbQry, ";\n");
3039 
3040  ArchiveEntry(fout, nilCatalogId, createDumpId(),
3041  ARCHIVE_OPTS(.tag = labelq->data,
3042  .owner = dba,
3043  .description = "COMMENT",
3044  .section = SECTION_NONE,
3045  .createStmt = dbQry->data,
3046  .deps = &dbDumpId,
3047  .nDeps = 1));
3048  }
3049  }
3050  else
3051  {
3052  dumpComment(fout, "DATABASE", qdatname, NULL, dba,
3053  dbCatId, 0, dbDumpId);
3054  }
3055 
3056  /* Dump DB security label, if enabled */
3057  if (!dopt->no_security_labels && fout->remoteVersion >= 90200)
3058  {
3059  PGresult *shres;
3060  PQExpBuffer seclabelQry;
3061 
3062  seclabelQry = createPQExpBuffer();
3063 
3064  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3065  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3066  resetPQExpBuffer(seclabelQry);
3067  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3068  if (seclabelQry->len > 0)
3069  ArchiveEntry(fout, nilCatalogId, createDumpId(),
3070  ARCHIVE_OPTS(.tag = labelq->data,
3071  .owner = dba,
3072  .description = "SECURITY LABEL",
3073  .section = SECTION_NONE,
3074  .createStmt = seclabelQry->data,
3075  .deps = &dbDumpId,
3076  .nDeps = 1));
3077  destroyPQExpBuffer(seclabelQry);
3078  PQclear(shres);
3079  }
3080 
3081  /*
3082  * Dump ACL if any. Note that we do not support initial privileges
3083  * (pg_init_privs) on databases.
3084  */
3085  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3086  qdatname, NULL, NULL,
3087  dba, datacl, rdatacl, "", "");
3088 
3089  /*
3090  * Now construct a DATABASE PROPERTIES archive entry to restore any
3091  * non-default database-level properties. (The reason this must be
3092  * separate is that we cannot put any additional commands into the TOC
3093  * entry that has CREATE DATABASE. pg_restore would execute such a group
3094  * in an implicit transaction block, and the backend won't allow CREATE
3095  * DATABASE in that context.)
3096  */
3097  resetPQExpBuffer(creaQry);
3098  resetPQExpBuffer(delQry);
3099 
3100  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3101  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3102  qdatname, datconnlimit);
3103 
3104  if (strcmp(datistemplate, "t") == 0)
3105  {
3106  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3107  qdatname);
3108 
3109  /*
3110  * The backend won't accept DROP DATABASE on a template database. We
3111  * can deal with that by removing the template marking before the DROP
3112  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3113  * since no such command is currently supported, fake it with a direct
3114  * UPDATE on pg_database.
3115  */
3116  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3117  "SET datistemplate = false WHERE datname = ");
3118  appendStringLiteralAH(delQry, datname, fout);
3119  appendPQExpBufferStr(delQry, ";\n");
3120  }
3121 
3122  /* Add database-specific SET options */
3123  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3124 
3125  /*
3126  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3127  * entry, too, for lack of a better place.
3128  */
3129  if (dopt->binary_upgrade)
3130  {
3131  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3132  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3133  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3134  "WHERE datname = ",
3135  frozenxid, minmxid);
3136  appendStringLiteralAH(creaQry, datname, fout);
3137  appendPQExpBufferStr(creaQry, ";\n");
3138  }
3139 
3140  if (creaQry->len > 0)
3141  ArchiveEntry(fout, nilCatalogId, createDumpId(),
3142  ARCHIVE_OPTS(.tag = datname,
3143  .owner = dba,
3144  .description = "DATABASE PROPERTIES",
3145  .section = SECTION_PRE_DATA,
3146  .createStmt = creaQry->data,
3147  .dropStmt = delQry->data,
3148  .deps = &dbDumpId));
3149 
3150  /*
3151  * pg_largeobject comes from the old system intact, so set its
3152  * relfrozenxids and relminmxids.
3153  */
3154  if (dopt->binary_upgrade)
3155  {
3156  PGresult *lo_res;
3157  PQExpBuffer loFrozenQry = createPQExpBuffer();
3158  PQExpBuffer loOutQry = createPQExpBuffer();
3159  int i_relfrozenxid,
3160  i_relminmxid;
3161 
3162  /*
3163  * pg_largeobject
3164  */
3165  if (fout->remoteVersion >= 90300)
3166  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid\n"
3167  "FROM pg_catalog.pg_class\n"
3168  "WHERE oid = %u;\n",
3169  LargeObjectRelationId);
3170  else
3171  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid\n"
3172  "FROM pg_catalog.pg_class\n"
3173  "WHERE oid = %u;\n",
3174  LargeObjectRelationId);
3175 
3176  lo_res = ExecuteSqlQueryForSingleRow(fout, loFrozenQry->data);
3177 
3178  i_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3179  i_relminmxid = PQfnumber(lo_res, "relminmxid");
3180 
3181  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3182  appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
3183  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3184  "WHERE oid = %u;\n",
3185  atooid(PQgetvalue(lo_res, 0, i_relfrozenxid)),
3186  atooid(PQgetvalue(lo_res, 0, i_relminmxid)),
3187  LargeObjectRelationId);
3188  ArchiveEntry(fout, nilCatalogId, createDumpId(),
3189  ARCHIVE_OPTS(.tag = "pg_largeobject",
3190  .description = "pg_largeobject",
3191  .section = SECTION_PRE_DATA,
3192  .createStmt = loOutQry->data));
3193 
3194  PQclear(lo_res);
3195 
3196  destroyPQExpBuffer(loFrozenQry);
3197  destroyPQExpBuffer(loOutQry);
3198  }
3199 
3200  PQclear(res);
3201 
3202  free(qdatname);
3203  destroyPQExpBuffer(dbQry);
3204  destroyPQExpBuffer(delQry);
3205  destroyPQExpBuffer(creaQry);
3206  destroyPQExpBuffer(labelq);
3207 }
3208 
3209 /*
3210  * Collect any database-specific or role-and-database-specific SET options
3211  * for this database, and append them to outbuf.
3212  */
3213 static void
3215  const char *dbname, Oid dboid)
3216 {
3217  PGconn *conn = GetConnection(AH);
3219  PGresult *res;
3220  int count = 1;
3221 
3222  /*
3223  * First collect database-specific options. Pre-8.4 server versions lack
3224  * unnest(), so we do this the hard way by querying once per subscript.
3225  */
3226  for (;;)
3227  {
3228  if (AH->remoteVersion >= 90000)
3229  printfPQExpBuffer(buf, "SELECT setconfig[%d] FROM pg_db_role_setting "
3230  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3231  count, dboid);
3232  else
3233  printfPQExpBuffer(buf, "SELECT datconfig[%d] FROM pg_database WHERE oid = '%u'::oid", count, dboid);
3234 
3235  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3236 
3237  if (PQntuples(res) == 1 &&
3238  !PQgetisnull(res, 0, 0))
3239  {
3240  makeAlterConfigCommand(conn, PQgetvalue(res, 0, 0),
3241  "DATABASE", dbname, NULL, NULL,
3242  outbuf);
3243  PQclear(res);
3244  count++;
3245  }
3246  else
3247  {
3248  PQclear(res);
3249  break;
3250  }
3251  }
3252 
3253  /* Now look for role-and-database-specific options */
3254  if (AH->remoteVersion >= 90000)
3255  {
3256  /* Here we can assume we have unnest() */
3257  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3258  "FROM pg_db_role_setting s, pg_roles r "
3259  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3260  dboid);
3261 
3262  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3263 
3264  if (PQntuples(res) > 0)
3265  {
3266  int i;
3267 
3268  for (i = 0; i < PQntuples(res); i++)
3269  makeAlterConfigCommand(conn, PQgetvalue(res, i, 1),
3270  "ROLE", PQgetvalue(res, i, 0),
3271  "DATABASE", dbname,
3272  outbuf);
3273  }
3274 
3275  PQclear(res);
3276  }
3277 
3278  destroyPQExpBuffer(buf);
3279 }
3280 
3281 /*
3282  * dumpEncoding: put the correct encoding into the archive
3283  */
3284 static void
3286 {
3287  const char *encname = pg_encoding_to_char(AH->encoding);
3289 
3290  pg_log_info("saving encoding = %s", encname);
3291 
3292  appendPQExpBufferStr(qry, "SET client_encoding = ");
3293  appendStringLiteralAH(qry, encname, AH);
3294  appendPQExpBufferStr(qry, ";\n");
3295 
3296  ArchiveEntry(AH, nilCatalogId, createDumpId(),
3297  ARCHIVE_OPTS(.tag = "ENCODING",
3298  .description = "ENCODING",
3299  .section = SECTION_PRE_DATA,
3300  .createStmt = qry->data));
3301 
3302  destroyPQExpBuffer(qry);
3303 }
3304 
3305 
3306 /*
3307  * dumpStdStrings: put the correct escape string behavior into the archive
3308  */
3309 static void
3311 {
3312  const char *stdstrings = AH->std_strings ? "on" : "off";
3314 
3315  pg_log_info("saving standard_conforming_strings = %s",
3316  stdstrings);
3317 
3318  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3319  stdstrings);
3320 
3321  ArchiveEntry(AH, nilCatalogId, createDumpId(),
3322  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3323  .description = "STDSTRINGS",
3324  .section = SECTION_PRE_DATA,
3325  .createStmt = qry->data));
3326 
3327  destroyPQExpBuffer(qry);
3328 }
3329 
3330 /*
3331  * dumpSearchPath: record the active search_path in the archive
3332  */
3333 static void
3335 {
3337  PQExpBuffer path = createPQExpBuffer();
3338  PGresult *res;
3339  char **schemanames = NULL;
3340  int nschemanames = 0;
3341  int i;
3342 
3343  /*
3344  * We use the result of current_schemas(), not the search_path GUC,
3345  * because that might contain wildcards such as "$user", which won't
3346  * necessarily have the same value during restore. Also, this way avoids
3347  * listing schemas that may appear in search_path but not actually exist,
3348  * which seems like a prudent exclusion.
3349  */
3350  res = ExecuteSqlQueryForSingleRow(AH,
3351  "SELECT pg_catalog.current_schemas(false)");
3352 
3353  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3354  fatal("could not parse result of current_schemas()");
3355 
3356  /*
3357  * We use set_config(), not a simple "SET search_path" command, because
3358  * the latter has less-clean behavior if the search path is empty. While
3359  * that's likely to get fixed at some point, it seems like a good idea to
3360  * be as backwards-compatible as possible in what we put into archives.
3361  */
3362  for (i = 0; i < nschemanames; i++)
3363  {
3364  if (i > 0)
3365  appendPQExpBufferStr(path, ", ");
3366  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3367  }
3368 
3369  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3370  appendStringLiteralAH(qry, path->data, AH);
3371  appendPQExpBufferStr(qry, ", false);\n");
3372 
3373  pg_log_info("saving search_path = %s", path->data);
3374 
3375  ArchiveEntry(AH, nilCatalogId, createDumpId(),
3376  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3377  .description = "SEARCHPATH",
3378  .section = SECTION_PRE_DATA,
3379  .createStmt = qry->data));
3380 
3381  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3382  AH->searchpath = pg_strdup(qry->data);
3383 
3384  if (schemanames)
3385  free(schemanames);
3386  PQclear(res);
3387  destroyPQExpBuffer(qry);
3388  destroyPQExpBuffer(path);
3389 }
3390 
3391 
3392 /*
3393  * getBlobs:
3394  * Collect schema-level data about large objects
3395  */
3396 static void
3398 {
3399  DumpOptions *dopt = fout->dopt;
3400  PQExpBuffer blobQry = createPQExpBuffer();
3401  BlobInfo *binfo;
3402  DumpableObject *bdata;
3403  PGresult *res;
3404  int ntups;
3405  int i;
3406  int i_oid;
3407  int i_lomowner;
3408  int i_lomacl;
3409  int i_rlomacl;
3410  int i_initlomacl;
3411  int i_initrlomacl;
3412 
3413  pg_log_info("reading large objects");
3414 
3415  /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
3416  if (fout->remoteVersion >= 90600)
3417  {
3418  PQExpBuffer acl_subquery = createPQExpBuffer();
3419  PQExpBuffer racl_subquery = createPQExpBuffer();
3420  PQExpBuffer init_acl_subquery = createPQExpBuffer();
3421  PQExpBuffer init_racl_subquery = createPQExpBuffer();
3422 
3423  buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
3424  init_racl_subquery, "l.lomacl", "l.lomowner",
3425  "pip.initprivs", "'L'", dopt->binary_upgrade);
3426 
3427  appendPQExpBuffer(blobQry,
3428  "SELECT l.oid, (%s l.lomowner) AS rolname, "
3429  "%s AS lomacl, "
3430  "%s AS rlomacl, "
3431  "%s AS initlomacl, "
3432  "%s AS initrlomacl "
3433  "FROM pg_largeobject_metadata l "
3434  "LEFT JOIN pg_init_privs pip ON "
3435  "(l.oid = pip.objoid "
3436  "AND pip.classoid = 'pg_largeobject'::regclass "
3437  "AND pip.objsubid = 0) ",
3439  acl_subquery->data,
3440  racl_subquery->data,
3441  init_acl_subquery->data,
3442  init_racl_subquery->data);
3443 
3444  destroyPQExpBuffer(acl_subquery);
3445  destroyPQExpBuffer(racl_subquery);
3446  destroyPQExpBuffer(init_acl_subquery);
3447  destroyPQExpBuffer(init_racl_subquery);
3448  }
3449  else if (fout->remoteVersion >= 90000)
3450  appendPQExpBuffer(blobQry,
3451  "SELECT oid, (%s lomowner) AS rolname, lomacl, "
3452  "NULL AS rlomacl, NULL AS initlomacl, "
3453  "NULL AS initrlomacl "
3454  " FROM pg_largeobject_metadata",
3456  else
3457  appendPQExpBufferStr(blobQry,
3458  "SELECT DISTINCT loid AS oid, "
3459  "NULL::name AS rolname, NULL::oid AS lomacl, "
3460  "NULL::oid AS rlomacl, NULL::oid AS initlomacl, "
3461  "NULL::oid AS initrlomacl "
3462  " FROM pg_largeobject");
3463 
3464  res = ExecuteSqlQuery(fout, blobQry->data, PGRES_TUPLES_OK);
3465 
3466  i_oid = PQfnumber(res, "oid");
3467  i_lomowner = PQfnumber(res, "rolname");
3468  i_lomacl = PQfnumber(res, "lomacl");
3469  i_rlomacl = PQfnumber(res, "rlomacl");
3470  i_initlomacl = PQfnumber(res, "initlomacl");
3471  i_initrlomacl = PQfnumber(res, "initrlomacl");
3472 
3473  ntups = PQntuples(res);
3474 
3475  /*
3476  * Each large object has its own BLOB archive entry.
3477  */
3478  binfo = (BlobInfo *) pg_malloc(ntups * sizeof(BlobInfo));
3479 
3480  for (i = 0; i < ntups; i++)
3481  {
3482  binfo[i].dobj.objType = DO_BLOB;
3483  binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3484  binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3485  AssignDumpId(&binfo[i].dobj);
3486 
3487  binfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3488  binfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_lomowner));
3489  binfo[i].blobacl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3490  binfo[i].rblobacl = pg_strdup(PQgetvalue(res, i, i_rlomacl));
3491  binfo[i].initblobacl = pg_strdup(PQgetvalue(res, i, i_initlomacl));
3492  binfo[i].initrblobacl = pg_strdup(PQgetvalue(res, i, i_initrlomacl));
3493 
3494  if (PQgetisnull(res, i, i_lomacl) &&
3495  PQgetisnull(res, i, i_rlomacl) &&
3496  PQgetisnull(res, i, i_initlomacl) &&
3497  PQgetisnull(res, i, i_initrlomacl))
3498  binfo[i].dobj.dump &= ~DUMP_COMPONENT_ACL;
3499 
3500  /*
3501  * In binary-upgrade mode for blobs, we do *not* dump out the blob
3502  * data, as it will be copied by pg_upgrade, which simply copies the
3503  * pg_largeobject table. We *do* however dump out anything but the
3504  * data, as pg_upgrade copies just pg_largeobject, but not
3505  * pg_largeobject_metadata, after the dump is restored.
3506  */
3507  if (dopt->binary_upgrade)
3508  binfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3509  }
3510 
3511  /*
3512  * If we have any large objects, a "BLOBS" archive entry is needed. This
3513  * is just a placeholder for sorting; it carries no data now.
3514  */
3515  if (ntups > 0)
3516  {
3517  bdata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3518  bdata->objType = DO_BLOB_DATA;
3519  bdata->catId = nilCatalogId;
3520  AssignDumpId(bdata);
3521  bdata->name = pg_strdup("BLOBS");
3522  }
3523 
3524  PQclear(res);
3525  destroyPQExpBuffer(blobQry);
3526 }
3527 
3528 /*
3529  * dumpBlob
3530  *
3531  * dump the definition (metadata) of the given large object
3532  */
3533 static void
3534 dumpBlob(Archive *fout, const BlobInfo *binfo)
3535 {
3536  PQExpBuffer cquery = createPQExpBuffer();
3537  PQExpBuffer dquery = createPQExpBuffer();
3538 
3539  appendPQExpBuffer(cquery,
3540  "SELECT pg_catalog.lo_create('%s');\n",
3541  binfo->dobj.name);
3542 
3543  appendPQExpBuffer(dquery,
3544  "SELECT pg_catalog.lo_unlink('%s');\n",
3545  binfo->dobj.name);
3546 
3547  if (binfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3548  ArchiveEntry(fout, binfo->dobj.catId, binfo->dobj.dumpId,
3549  ARCHIVE_OPTS(.tag = binfo->dobj.name,
3550  .owner = binfo->rolname,
3551  .description = "BLOB",
3552  .section = SECTION_PRE_DATA,
3553  .createStmt = cquery->data,
3554  .dropStmt = dquery->data));
3555 
3556  /* Dump comment if any */
3557  if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3558  dumpComment(fout, "LARGE OBJECT", binfo->dobj.name,
3559  NULL, binfo->rolname,
3560  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3561 
3562  /* Dump security label if any */
3563  if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3564  dumpSecLabel(fout, "LARGE OBJECT", binfo->dobj.name,
3565  NULL, binfo->rolname,
3566  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3567 
3568  /* Dump ACL if any */
3569  if (binfo->blobacl && (binfo->dobj.dump & DUMP_COMPONENT_ACL))
3570  dumpACL(fout, binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3571  binfo->dobj.name, NULL,
3572  NULL, binfo->rolname, binfo->blobacl, binfo->rblobacl,
3573  binfo->initblobacl, binfo->initrblobacl);
3574 
3575  destroyPQExpBuffer(cquery);
3576  destroyPQExpBuffer(dquery);
3577 }
3578 
3579 /*
3580  * dumpBlobs:
3581  * dump the data contents of all large objects
3582  */
3583 static int
3584 dumpBlobs(Archive *fout, const void *arg)
3585 {
3586  const char *blobQry;
3587  const char *blobFetchQry;
3588  PGconn *conn = GetConnection(fout);
3589  PGresult *res;
3590  char buf[LOBBUFSIZE];
3591  int ntups;
3592  int i;
3593  int cnt;
3594 
3595  pg_log_info("saving large objects");
3596 
3597  /*
3598  * Currently, we re-fetch all BLOB OIDs using a cursor. Consider scanning
3599  * the already-in-memory dumpable objects instead...
3600  */
3601  if (fout->remoteVersion >= 90000)
3602  blobQry =
3603  "DECLARE bloboid CURSOR FOR "
3604  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3605  else
3606  blobQry =
3607  "DECLARE bloboid CURSOR FOR "
3608  "SELECT DISTINCT loid FROM pg_largeobject ORDER BY 1";
3609 
3610  ExecuteSqlStatement(fout, blobQry);
3611 
3612  /* Command to fetch from cursor */
3613  blobFetchQry = "FETCH 1000 IN bloboid";
3614 
3615  do
3616  {
3617  /* Do a fetch */
3618  res = ExecuteSqlQuery(fout, blobFetchQry, PGRES_TUPLES_OK);
3619 
3620  /* Process the tuples, if any */
3621  ntups = PQntuples(res);
3622  for (i = 0; i < ntups; i++)
3623  {
3624  Oid blobOid;
3625  int loFd;
3626 
3627  blobOid = atooid(PQgetvalue(res, i, 0));
3628  /* Open the BLOB */
3629  loFd = lo_open(conn, blobOid, INV_READ);
3630  if (loFd == -1)
3631  fatal("could not open large object %u: %s",
3632  blobOid, PQerrorMessage(conn));
3633 
3634  StartBlob(fout, blobOid);
3635 
3636  /* Now read it in chunks, sending data to archive */
3637  do
3638  {
3639  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3640  if (cnt < 0)
3641  fatal("error reading large object %u: %s",
3642  blobOid, PQerrorMessage(conn));
3643 
3644  WriteData(fout, buf, cnt);
3645  } while (cnt > 0);
3646 
3647  lo_close(conn, loFd);
3648 
3649  EndBlob(fout, blobOid);
3650  }
3651 
3652  PQclear(res);
3653  } while (ntups > 0);
3654 
3655  return 1;
3656 }
3657 
3658 /*
3659  * getPolicies
3660  * get information about all RLS policies on dumpable tables.
3661  */
3662 void
3663 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3664 {
3665  PQExpBuffer query;
3666  PGresult *res;
3667  PolicyInfo *polinfo;
3668  int i_oid;
3669  int i_tableoid;
3670  int i_polrelid;
3671  int i_polname;
3672  int i_polcmd;
3673  int i_polpermissive;
3674  int i_polroles;
3675  int i_polqual;
3676  int i_polwithcheck;
3677  int i,
3678  j,
3679  ntups;
3680 
3681  if (fout->remoteVersion < 90500)
3682  return;
3683 
3684  query = createPQExpBuffer();
3685 
3686  /*
3687  * First, check which tables have RLS enabled. We represent RLS being
3688  * enabled on a table by creating a PolicyInfo object with null polname.
3689  */
3690  for (i = 0; i < numTables; i++)
3691  {
3692  TableInfo *tbinfo = &tblinfo[i];
3693 
3694  /* Ignore row security on tables not to be dumped */
3695  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3696  continue;
3697 
3698  if (tbinfo->rowsec)
3699  {
3700  /*
3701  * Note: use tableoid 0 so that this object won't be mistaken for
3702  * something that pg_depend entries apply to.
3703  */
3704  polinfo = pg_malloc(sizeof(PolicyInfo));
3705  polinfo->dobj.objType = DO_POLICY;
3706  polinfo->dobj.catId.tableoid = 0;
3707  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3708  AssignDumpId(&polinfo->dobj);
3709  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3710  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3711  polinfo->poltable = tbinfo;
3712  polinfo->polname = NULL;
3713  polinfo->polcmd = '\0';
3714  polinfo->polpermissive = 0;
3715  polinfo->polroles = NULL;
3716  polinfo->polqual = NULL;
3717  polinfo->polwithcheck = NULL;
3718  }
3719  }
3720 
3721  /*
3722  * Now, read all RLS policies, and create PolicyInfo objects for all those
3723  * that are of interest.
3724  */
3725  pg_log_info("reading row-level security policies");
3726 
3727  printfPQExpBuffer(query,
3728  "SELECT oid, tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3729  if (fout->remoteVersion >= 100000)
3730  appendPQExpBuffer(query, "pol.polpermissive, ");
3731  else
3732  appendPQExpBuffer(query, "'t' as polpermissive, ");
3733  appendPQExpBuffer(query,
3734  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3735  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3736  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3737  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3738  "FROM pg_catalog.pg_policy pol");
3739 
3740  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3741 
3742  ntups = PQntuples(res);
3743  if (ntups > 0)
3744  {
3745  i_oid = PQfnumber(res, "oid");
3746  i_tableoid = PQfnumber(res, "tableoid");
3747  i_polrelid = PQfnumber(res, "polrelid");
3748  i_polname = PQfnumber(res, "polname");
3749  i_polcmd = PQfnumber(res, "polcmd");
3750  i_polpermissive = PQfnumber(res, "polpermissive");
3751  i_polroles = PQfnumber(res, "polroles");
3752  i_polqual = PQfnumber(res, "polqual");
3753  i_polwithcheck = PQfnumber(res, "polwithcheck");
3754 
3755  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3756 
3757  for (j = 0; j < ntups; j++)
3758  {
3759  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3760  TableInfo *tbinfo = findTableByOid(polrelid);
3761 
3762  /*
3763  * Ignore row security on tables not to be dumped. (This will
3764  * result in some harmless wasted slots in polinfo[].)
3765  */
3766  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3767  continue;
3768 
3769  polinfo[j].dobj.objType = DO_POLICY;
3770  polinfo[j].dobj.catId.tableoid =
3771  atooid(PQgetvalue(res, j, i_tableoid));
3772  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3773  AssignDumpId(&polinfo[j].dobj);
3774  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3775  polinfo[j].poltable = tbinfo;
3776  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3777  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3778 
3779  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3780  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3781 
3782  if (PQgetisnull(res, j, i_polroles))
3783  polinfo[j].polroles = NULL;
3784  else
3785  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3786 
3787  if (PQgetisnull(res, j, i_polqual))
3788  polinfo[j].polqual = NULL;
3789  else
3790  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3791 
3792  if (PQgetisnull(res, j, i_polwithcheck))
3793  polinfo[j].polwithcheck = NULL;
3794  else
3795  polinfo[j].polwithcheck
3796  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3797  }
3798  }
3799 
3800  PQclear(res);
3801 
3802  destroyPQExpBuffer(query);
3803 }
3804 
3805 /*
3806  * dumpPolicy
3807  * dump the definition of the given policy
3808  */
3809 static void
3810 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3811 {
3812  DumpOptions *dopt = fout->dopt;
3813  TableInfo *tbinfo = polinfo->poltable;
3814  PQExpBuffer query;
3815  PQExpBuffer delqry;
3816  PQExpBuffer polprefix;
3817  char *qtabname;
3818  const char *cmd;
3819  char *tag;
3820 
3821  if (dopt->dataOnly)
3822  return;
3823 
3824  /*
3825  * If polname is NULL, then this record is just indicating that ROW LEVEL
3826  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3827  * ROW LEVEL SECURITY.
3828  */
3829  if (polinfo->polname == NULL)
3830  {
3831  query = createPQExpBuffer();
3832 
3833  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3834  fmtQualifiedDumpable(tbinfo));
3835 
3836  /*
3837  * We must emit the ROW SECURITY object's dependency on its table
3838  * explicitly, because it will not match anything in pg_depend (unlike
3839  * the case for other PolicyInfo objects).
3840  */
3841  if (polinfo->dobj.dump & DUMP_COMPONENT_POLICY)
3842  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3843  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3844  .namespace = polinfo->dobj.namespace->dobj.name,
3845  .owner = tbinfo->rolname,
3846  .description = "ROW SECURITY",
3847  .section = SECTION_POST_DATA,
3848  .createStmt = query->data,
3849  .deps = &(tbinfo->dobj.dumpId),
3850  .nDeps = 1));
3851 
3852  destroyPQExpBuffer(query);
3853  return;
3854  }
3855 
3856  if (polinfo->polcmd == '*')
3857  cmd = "";
3858  else if (polinfo->polcmd == 'r')
3859  cmd = " FOR SELECT";
3860  else if (polinfo->polcmd == 'a')
3861  cmd = " FOR INSERT";
3862  else if (polinfo->polcmd == 'w')
3863  cmd = " FOR UPDATE";
3864  else if (polinfo->polcmd == 'd')
3865  cmd = " FOR DELETE";
3866  else
3867  {
3868  pg_log_error("unexpected policy command type: %c",
3869  polinfo->polcmd);
3870  exit_nicely(1);
3871  }
3872 
3873  query = createPQExpBuffer();
3874  delqry = createPQExpBuffer();
3875  polprefix = createPQExpBuffer();
3876 
3877  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3878 
3879  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
3880 
3881  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
3882  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
3883 
3884  if (polinfo->polroles != NULL)
3885  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
3886 
3887  if (polinfo->polqual != NULL)
3888  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
3889 
3890  if (polinfo->polwithcheck != NULL)
3891  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
3892 
3893  appendPQExpBufferStr(query, ";\n");
3894 
3895  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
3896  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
3897 
3898  appendPQExpBuffer(polprefix, "POLICY %s ON",
3899  fmtId(polinfo->polname));
3900 
3901  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
3902 
3903  if (polinfo->dobj.dump & DUMP_COMPONENT_POLICY)
3904  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3905  ARCHIVE_OPTS(.tag = tag,
3906  .namespace = polinfo->dobj.namespace->dobj.name,
3907  .owner = tbinfo->rolname,
3908  .description = "POLICY",
3909  .section = SECTION_POST_DATA,
3910  .createStmt = query->data,
3911  .dropStmt = delqry->data));
3912 
3913  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3914  dumpComment(fout, polprefix->data, qtabname,
3915  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
3916  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
3917 
3918  free(tag);
3919  destroyPQExpBuffer(query);
3920  destroyPQExpBuffer(delqry);
3921  destroyPQExpBuffer(polprefix);
3922  free(qtabname);
3923 }
3924 
3925 /*
3926  * getPublications
3927  * get information about publications
3928  */
3931 {
3932  DumpOptions *dopt = fout->dopt;
3933  PQExpBuffer query;
3934  PGresult *res;
3935  PublicationInfo *pubinfo;
3936  int i_tableoid;
3937  int i_oid;
3938  int i_pubname;
3939  int i_rolname;
3940  int i_puballtables;
3941  int i_pubinsert;
3942  int i_pubupdate;
3943  int i_pubdelete;
3944  int i_pubtruncate;
3945  int i_pubviaroot;
3946  int i,
3947  ntups;
3948 
3949  if (dopt->no_publications || fout->remoteVersion < 100000)
3950  {
3951  *numPublications = 0;
3952  return NULL;
3953  }
3954 
3955  query = createPQExpBuffer();
3956 
3957  resetPQExpBuffer(query);
3958 
3959  /* Get the publications. */
3960  if (fout->remoteVersion >= 130000)
3961  appendPQExpBuffer(query,
3962  "SELECT p.tableoid, p.oid, p.pubname, "
3963  "(%s p.pubowner) AS rolname, "
3964  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
3965  "FROM pg_publication p",
3967  else if (fout->remoteVersion >= 110000)
3968  appendPQExpBuffer(query,
3969  "SELECT p.tableoid, p.oid, p.pubname, "
3970  "(%s p.pubowner) AS rolname, "
3971  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
3972  "FROM pg_publication p",
3974  else
3975  appendPQExpBuffer(query,
3976  "SELECT p.tableoid, p.oid, p.pubname, "
3977  "(%s p.pubowner) AS rolname, "
3978  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
3979  "FROM pg_publication p",
3981 
3982  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3983 
3984  ntups = PQntuples(res);
3985 
3986  i_tableoid = PQfnumber(res, "tableoid");
3987  i_oid = PQfnumber(res, "oid");
3988  i_pubname = PQfnumber(res, "pubname");
3989  i_rolname = PQfnumber(res, "rolname");
3990  i_puballtables = PQfnumber(res, "puballtables");
3991  i_pubinsert = PQfnumber(res, "pubinsert");
3992  i_pubupdate = PQfnumber(res, "pubupdate");
3993  i_pubdelete = PQfnumber(res, "pubdelete");
3994  i_pubtruncate = PQfnumber(res, "pubtruncate");
3995  i_pubviaroot = PQfnumber(res, "pubviaroot");
3996 
3997  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
3998 
3999  for (i = 0; i < ntups; i++)
4000  {
4001  pubinfo[i].dobj.objType = DO_PUBLICATION;
4002  pubinfo[i].dobj.catId.tableoid =
4003  atooid(PQgetvalue(res, i, i_tableoid));
4004  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4005  AssignDumpId(&pubinfo[i].dobj);
4006  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4007  pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
4008  pubinfo[i].puballtables =
4009  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4010  pubinfo[i].pubinsert =
4011  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4012  pubinfo[i].pubupdate =
4013  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4014  pubinfo[i].pubdelete =
4015  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4016  pubinfo[i].pubtruncate =
4017  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4018  pubinfo[i].pubviaroot =
4019  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4020 
4021  if (strlen(pubinfo[i].rolname) == 0)
4022  pg_log_warning("owner of publication \"%s\" appears to be invalid",
4023  pubinfo[i].dobj.name);
4024 
4025  /* Decide whether we want to dump it */
4026  selectDumpableObject(&(pubinfo[i].dobj), fout);
4027  }
4028  PQclear(res);
4029 
4030  destroyPQExpBuffer(query);
4031 
4032  *numPublications = ntups;
4033  return pubinfo;
4034 }
4035 
4036 /*
4037  * dumpPublication
4038  * dump the definition of the given publication
4039  */
4040 static void
4042 {
4043  PQExpBuffer delq;
4044  PQExpBuffer query;
4045  char *qpubname;
4046  bool first = true;
4047 
4048  if (!(pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4049  return;
4050 
4051  delq = createPQExpBuffer();
4052  query = createPQExpBuffer();
4053 
4054  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4055 
4056  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4057  qpubname);
4058 
4059  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4060  qpubname);
4061 
4062  if (pubinfo->puballtables)
4063  appendPQExpBufferStr(query, " FOR ALL TABLES");
4064 
4065  appendPQExpBufferStr(query, " WITH (publish = '");
4066  if (pubinfo->pubinsert)
4067  {
4068  appendPQExpBufferStr(query, "insert");
4069  first = false;
4070  }
4071 
4072  if (pubinfo->pubupdate)
4073  {
4074  if (!first)
4075  appendPQExpBufferStr(query, ", ");
4076 
4077  appendPQExpBufferStr(query, "update");
4078  first = false;
4079  }
4080 
4081  if (pubinfo->pubdelete)
4082  {
4083  if (!first)
4084  appendPQExpBufferStr(query, ", ");
4085 
4086  appendPQExpBufferStr(query, "delete");
4087  first = false;
4088  }
4089 
4090  if (pubinfo->pubtruncate)
4091  {
4092  if (!first)
4093  appendPQExpBufferStr(query, ", ");
4094 
4095  appendPQExpBufferStr(query, "truncate");
4096  first = false;
4097  }
4098 
4099  appendPQExpBufferStr(query, "'");
4100 
4101  if (pubinfo->pubviaroot)
4102  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4103 
4104  appendPQExpBufferStr(query, ");\n");
4105 
4106  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4107  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4108  .owner = pubinfo->rolname,
4109  .description = "PUBLICATION",
4110  .section = SECTION_POST_DATA,
4111  .createStmt = query->data,
4112  .dropStmt = delq->data));
4113 
4114  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4115  dumpComment(fout, "PUBLICATION", qpubname,
4116  NULL, pubinfo->rolname,
4117  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4118 
4119  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4120  dumpSecLabel(fout, "PUBLICATION", qpubname,
4121  NULL, pubinfo->rolname,
4122  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4123 
4124  destroyPQExpBuffer(delq);
4125  destroyPQExpBuffer(query);
4126  free(qpubname);
4127 }
4128 
4129 /*
4130  * getPublicationTables
4131  * get information about publication membership for dumpable tables.
4132  */
4133 void
4135 {
4136  PQExpBuffer query;
4137  PGresult *res;
4138  PublicationRelInfo *pubrinfo;
4139  DumpOptions *dopt = fout->dopt;
4140  int i_tableoid;
4141  int i_oid;
4142  int i_prpubid;
4143  int i_prrelid;
4144  int i,
4145  j,
4146  ntups;
4147 
4148  if (dopt->no_publications || fout->remoteVersion < 100000)
4149  return;
4150 
4151  query = createPQExpBuffer();
4152 
4153  /* Collect all publication membership info. */
4154  appendPQExpBufferStr(query,
4155  "SELECT tableoid, oid, prpubid, prrelid "
4156  "FROM pg_catalog.pg_publication_rel");
4157  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4158 
4159  ntups = PQntuples(res);
4160 
4161  i_tableoid = PQfnumber(res, "tableoid");
4162  i_oid = PQfnumber(res, "oid");
4163  i_prpubid = PQfnumber(res, "prpubid");
4164  i_prrelid = PQfnumber(res, "prrelid");
4165 
4166  /* this allocation may be more than we need */
4167  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4168  j = 0;
4169 
4170  for (i = 0; i < ntups; i++)
4171  {
4172  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4173  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4174  PublicationInfo *pubinfo;
4175  TableInfo *tbinfo;
4176 
4177  /*
4178  * Ignore any entries for which we aren't interested in either the
4179  * publication or the rel.
4180  */
4181  pubinfo = findPublicationByOid(prpubid);
4182  if (pubinfo == NULL)
4183  continue;
4184  tbinfo = findTableByOid(prrelid);
4185  if (tbinfo == NULL)
4186  continue;
4187 
4188  /*
4189  * Ignore publication membership of tables whose definitions are not
4190  * to be dumped.
4191  */
4192  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4193  continue;
4194 
4195  /* OK, make a DumpableObject for this relationship */
4196  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4197  pubrinfo[j].dobj.catId.tableoid =
4198  atooid(PQgetvalue(res, i, i_tableoid));
4199  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4200  AssignDumpId(&pubrinfo[j].dobj);
4201  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4202  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4203  pubrinfo[j].publication = pubinfo;
4204  pubrinfo[j].pubtable = tbinfo;
4205 
4206  /* Decide whether we want to dump it */
4207  selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
4208 
4209  j++;
4210  }
4211 
4212  PQclear(res);
4213  destroyPQExpBuffer(query);
4214 }
4215 
4216 /*
4217  * dumpPublicationTable
4218  * dump the definition of the given publication table mapping
4219  */
4220 static void
4222 {
4223  PublicationInfo *pubinfo = pubrinfo->publication;
4224  TableInfo *tbinfo = pubrinfo->pubtable;
4225  PQExpBuffer query;
4226  char *tag;
4227 
4228  if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4229  return;
4230 
4231  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4232 
4233  query = createPQExpBuffer();
4234 
4235  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4236  fmtId(pubinfo->dobj.name));
4237  appendPQExpBuffer(query, " %s;\n",
4238  fmtQualifiedDumpable(tbinfo));
4239 
4240  /*
4241  * There is no point in creating a drop query as the drop is done by table
4242  * drop. (If you think to change this, see also _printTocEntry().)
4243  * Although this object doesn't really have ownership as such, set the
4244  * owner field anyway to ensure that the command is run by the correct
4245  * role at restore time.
4246  */
4247  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4248  ARCHIVE_OPTS(.tag = tag,
4249  .namespace = tbinfo->dobj.namespace->dobj.name,
4250  .owner = pubinfo->rolname,
4251  .description = "PUBLICATION TABLE",
4252  .section = SECTION_POST_DATA,
4253  .createStmt = query->data));
4254 
4255  free(tag);
4256  destroyPQExpBuffer(query);
4257 }
4258 
4259 /*
4260  * Is the currently connected user a superuser?
4261  */
4262 static bool
4264 {
4265  ArchiveHandle *AH = (ArchiveHandle *) fout;
4266  const char *val;
4267 
4268  val = PQparameterStatus(AH->connection, "is_superuser");
4269 
4270  if (val && strcmp(val, "on") == 0)
4271  return true;
4272 
4273  return false;
4274 }
4275 
4276 /*
4277  * getSubscriptions
4278  * get information about subscriptions
4279  */
4280 void
4282 {
4283  DumpOptions *dopt = fout->dopt;
4284  PQExpBuffer query;
4285  PGresult *res;
4286  SubscriptionInfo *subinfo;
4287  int i_tableoid;
4288  int i_oid;
4289  int i_subname;
4290  int i_rolname;
4291  int i_substream;
4292  int i_subtwophasestate;
4293  int i_subconninfo;
4294  int i_subslotname;
4295  int i_subsynccommit;
4296  int i_subpublications;
4297  int i_subbinary;
4298  int i,
4299  ntups;
4300 
4301  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4302  return;
4303 
4304  if (!is_superuser(fout))
4305  {
4306  int n;
4307 
4308  res = ExecuteSqlQuery(fout,
4309  "SELECT count(*) FROM pg_subscription "
4310  "WHERE subdbid = (SELECT oid FROM pg_database"
4311  " WHERE datname = current_database())",
4312  PGRES_TUPLES_OK);
4313  n = atoi(PQgetvalue(res, 0, 0));
4314  if (n > 0)
4315  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4316  PQclear(res);
4317  return;
4318  }
4319 
4320  query = createPQExpBuffer();
4321 
4322  /* Get the subscriptions in current database. */
4323  appendPQExpBuffer(query,
4324  "SELECT s.tableoid, s.oid, s.subname,\n"
4325  " (%s s.subowner) AS rolname,\n"
4326  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4327  " s.subpublications,\n",
4329 
4330  if (fout->remoteVersion >= 140000)
4331  appendPQExpBufferStr(query, " s.subbinary,\n");
4332  else
4333  appendPQExpBufferStr(query, " false AS subbinary,\n");
4334 
4335  if (fout->remoteVersion >= 140000)
4336  appendPQExpBufferStr(query, " s.substream,\n");
4337  else
4338  appendPQExpBufferStr(query, " false AS substream,\n");
4339 
4340  if (fout->remoteVersion >= 150000)
4341  appendPQExpBufferStr(query, " s.subtwophasestate\n");
4342  else
4343  appendPQExpBuffer(query,
4344  " '%c' AS subtwophasestate\n",
4346 
4347  appendPQExpBufferStr(query,
4348  "FROM pg_subscription s\n"
4349  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4350  " WHERE datname = current_database())");
4351 
4352  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4353 
4354  ntups = PQntuples(res);
4355 
4356  i_tableoid = PQfnumber(res, "tableoid");
4357  i_oid = PQfnumber(res, "oid");
4358  i_subname = PQfnumber(res, "subname");
4359  i_rolname = PQfnumber(res, "rolname");
4360  i_subconninfo = PQfnumber(res, "subconninfo");
4361  i_subslotname = PQfnumber(res, "subslotname");
4362  i_subsynccommit = PQfnumber(res, "subsynccommit");
4363  i_subpublications = PQfnumber(res, "subpublications");
4364  i_subbinary = PQfnumber(res, "subbinary");
4365  i_substream = PQfnumber(res, "substream");
4366  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4367 
4368  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4369 
4370  for (i = 0; i < ntups; i++)
4371  {
4372  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4373  subinfo[i].dobj.catId.tableoid =
4374  atooid(PQgetvalue(res, i, i_tableoid));
4375  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4376  AssignDumpId(&subinfo[i].dobj);
4377  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4378  subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
4379  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4380  if (PQgetisnull(res, i, i_subslotname))
4381  subinfo[i].subslotname = NULL;
4382  else
4383  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4384  subinfo[i].subsynccommit =
4385  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4386  subinfo[i].subpublications =
4387  pg_strdup(PQgetvalue(res, i, i_subpublications));
4388  subinfo[i].subbinary =
4389  pg_strdup(PQgetvalue(res, i, i_subbinary));
4390  subinfo[i].substream =
4391  pg_strdup(PQgetvalue(res, i, i_substream));
4392  subinfo[i].subtwophasestate =
4393  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4394 
4395  if (strlen(subinfo[i].rolname) == 0)
4396  pg_log_warning("owner of subscription \"%s\" appears to be invalid",
4397  subinfo[i].dobj.name);
4398 
4399  /* Decide whether we want to dump it */
4400  selectDumpableObject(&(subinfo[i].dobj), fout);
4401  }
4402  PQclear(res);
4403 
4404  destroyPQExpBuffer(query);
4405 }
4406 
4407 /*
4408  * dumpSubscription
4409  * dump the definition of the given subscription
4410  */
4411 static void
4413 {
4414  PQExpBuffer delq;
4415  PQExpBuffer query;
4416  PQExpBuffer publications;
4417  char *qsubname;
4418  char **pubnames = NULL;
4419  int npubnames = 0;
4420  int i;
4421  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4422 
4423  if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4424  return;
4425 
4426  delq = createPQExpBuffer();
4427  query = createPQExpBuffer();
4428 
4429  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4430 
4431  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4432  qsubname);
4433 
4434  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4435  qsubname);
4436  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4437 
4438  /* Build list of quoted publications and append them to query. */
4439  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4440  fatal("could not parse subpublications array");
4441 
4442  publications = createPQExpBuffer();
4443  for (i = 0; i < npubnames; i++)
4444  {
4445  if (i > 0)
4446  appendPQExpBufferStr(publications, ", ");
4447 
4448  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4449  }
4450 
4451  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4452  if (subinfo->subslotname)
4453  appendStringLiteralAH(query, subinfo->subslotname, fout);
4454  else
4455  appendPQExpBufferStr(query, "NONE");
4456 
4457  if (strcmp(subinfo->subbinary, "t") == 0)
4458  appendPQExpBufferStr(query, ", binary = true");
4459 
4460  if (strcmp(subinfo->substream, "f") != 0)
4461  appendPQExpBufferStr(query, ", streaming = on");
4462 
4463  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4464  appendPQExpBufferStr(query, ", two_phase = on");
4465 
4466  if (strcmp(subinfo->subsynccommit, "off") != 0)
4467  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4468 
4469  appendPQExpBufferStr(query, ");\n");
4470 
4471  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4472  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4473  .owner = subinfo->rolname,
4474  .description = "SUBSCRIPTION",
4475  .section = SECTION_POST_DATA,
4476  .createStmt = query->data,
4477  .dropStmt = delq->data));
4478 
4479  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4480  dumpComment(fout, "SUBSCRIPTION", qsubname,
4481  NULL, subinfo->rolname,
4482  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4483 
4484  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4485  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4486  NULL, subinfo->rolname,
4487  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4488 
4489  destroyPQExpBuffer(publications);
4490  if (pubnames)
4491  free(pubnames);
4492 
4493  destroyPQExpBuffer(delq);
4494  destroyPQExpBuffer(query);
4495  free(qsubname);
4496 }
4497 
4498 /*
4499  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4500  * the object needs.
4501  */
4502 static void
4504  PQExpBuffer create,
4505  const DumpableObject *dobj,
4506  const char *catalog,
4507  const char *keyword,
4508  const char *objname)
4509 {
4510  if (dobj->depends_on_ext)
4511  {
4512  char *nm;
4513  PGresult *res;
4514  PQExpBuffer query;
4515  int ntups;
4516  int i_extname;
4517  int i;
4518 
4519  /* dodge fmtId() non-reentrancy */
4520  nm = pg_strdup(objname);
4521 
4522  query = createPQExpBuffer();
4523  appendPQExpBuffer(query,
4524  "SELECT e.extname "
4525  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4526  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4527  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4528  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4529  catalog,
4530  dobj->catId.oid);
4531  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4532  ntups = PQntuples(res);
4533  i_extname = PQfnumber(res, "extname");
4534  for (i = 0; i < ntups; i++)
4535  {
4536  appendPQExpBuffer(create, "ALTER %s %s DEPENDS ON EXTENSION %s;\n",
4537  keyword, nm,
4538  fmtId(PQgetvalue(res, i, i_extname)));
4539  }
4540 
4541  PQclear(res);
4542  destroyPQExpBuffer(query);
4543  pg_free(nm);
4544  }
4545 }
4546 
4547 static Oid
4549 {
4550  /*
4551  * If the old version didn't assign an array type, but the new version
4552  * does, we must select an unused type OID to assign. This currently only
4553  * happens for domains, when upgrading pre-v11 to v11 and up.
4554  *
4555  * Note: local state here is kind of ugly, but we must have some, since we
4556  * mustn't choose the same unused OID more than once.
4557  */
4558  static Oid next_possible_free_oid = FirstNormalObjectId;
4559  PGresult *res;
4560  bool is_dup;
4561 
4562  do
4563  {
4564  ++next_possible_free_oid;
4565  printfPQExpBuffer(upgrade_query,
4566  "SELECT EXISTS(SELECT 1 "
4567  "FROM pg_catalog.pg_type "
4568  "WHERE oid = '%u'::pg_catalog.oid);",
4569  next_possible_free_oid);
4570  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4571  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4572  PQclear(res);
4573  } while (is_dup);
4574 
4575  return next_possible_free_oid;
4576 }
4577 
4578 static void
4580  PQExpBuffer upgrade_buffer,
4581  Oid pg_type_oid,
4582  bool force_array_type,
4583  bool include_multirange_type)
4584 {
4585  PQExpBuffer upgrade_query = createPQExpBuffer();
4586  PGresult *res;
4587  Oid pg_type_array_oid;
4588  Oid pg_type_multirange_oid;
4589  Oid pg_type_multirange_array_oid;
4590 
4591  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4592  appendPQExpBuffer(upgrade_buffer,
4593  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4594  pg_type_oid);
4595 
4596  /* we only support old >= 8.3 for binary upgrades */
4597  appendPQExpBuffer(upgrade_query,
4598  "SELECT typarray "
4599  "FROM pg_catalog.pg_type "
4600  "WHERE oid = '%u'::pg_catalog.oid;",
4601  pg_type_oid);
4602 
4603  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4604 
4605  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4606 
4607  PQclear(res);
4608 
4609  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4610  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4611 
4612  if (OidIsValid(pg_type_array_oid))
4613  {
4614  appendPQExpBufferStr(upgrade_buffer,
4615  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4616  appendPQExpBuffer(upgrade_buffer,
4617  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4618  pg_type_array_oid);
4619  }
4620 
4621  /*
4622  * Pre-set the multirange type oid and its own array type oid.
4623  */
4624  if (include_multirange_type)
4625  {
4626  if (fout->remoteVersion >= 140000)
4627  {
4628  appendPQExpBuffer(upgrade_query,
4629  "SELECT t.oid, t.typarray "
4630  "FROM pg_catalog.pg_type t "
4631  "JOIN pg_catalog.pg_range r "
4632  "ON t.oid = r.rngmultitypid "
4633  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4634  pg_type_oid);
4635 
4636  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4637 
4638  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4639  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4640 
4641  PQclear(res);
4642  }
4643  else
4644  {
4645  pg_type_multirange_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4646  pg_type_multirange_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4647  }
4648 
4649  appendPQExpBufferStr(upgrade_buffer,
4650  "\n-- For binary upgrade, must preserve multirange pg_type oid\n");
4651  appendPQExpBuffer(upgrade_buffer,
4652  "SELECT pg_catalog.binary_upgrade_set_next_multirange_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4653  pg_type_multirange_oid);
4654  appendPQExpBufferStr(upgrade_buffer,
4655  "\n-- For binary upgrade, must preserve multirange pg_type array oid\n");
4656  appendPQExpBuffer(upgrade_buffer,
4657  "SELECT pg_catalog.binary_upgrade_set_next_multirange_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4658  pg_type_multirange_array_oid);
4659  }
4660 
4661  destroyPQExpBuffer(upgrade_query);
4662 }
4663 
4664 static void
4666  PQExpBuffer upgrade_buffer,
4667  Oid pg_rel_oid)
4668 {
4669  PQExpBuffer upgrade_query = createPQExpBuffer();
4670  PGresult *upgrade_res;
4671  Oid pg_type_oid;
4672 
4673  appendPQExpBuffer(upgrade_query,
4674  "SELECT c.reltype AS crel "
4675  "FROM pg_catalog.pg_class c "
4676  "WHERE c.oid = '%u'::pg_catalog.oid;",
4677  pg_rel_oid);
4678 
4679  upgrade_res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4680 
4681  pg_type_oid = atooid(PQgetvalue(upgrade_res, 0, PQfnumber(upgrade_res, "crel")));
4682 
4683  if (OidIsValid(pg_type_oid))
4684  binary_upgrade_set_type_oids_by_type_oid(fout, upgrade_buffer,
4685  pg_type_oid, false, false);
4686 
4687  PQclear(upgrade_res);
4688  destroyPQExpBuffer(upgrade_query);
4689 }
4690 
4691 static void
4693  PQExpBuffer upgrade_buffer, Oid pg_class_oid,
4694  bool is_index)
4695 {
4696  appendPQExpBufferStr(upgrade_buffer,
4697  "\n-- For binary upgrade, must preserve pg_class oids\n");
4698 
4699  if (!is_index)
4700  {
4701  PQExpBuffer upgrade_query = createPQExpBuffer();
4702  PGresult *upgrade_res;
4703  Oid pg_class_reltoastrelid;
4704  char pg_class_relkind;
4705  Oid pg_index_indexrelid;
4706 
4707  appendPQExpBuffer(upgrade_buffer,
4708  "SELECT pg_catalog.binary_upgrade_set_next_heap_pg_class_oid('%u'::pg_catalog.oid);\n",
4709  pg_class_oid);
4710 
4711  /*
4712  * Preserve the OIDs of the table's toast table and index, if any.
4713  * Indexes cannot have toast tables, so we need not make this probe in
4714  * the index code path.
4715  *
4716  * One complexity is that the current table definition might not
4717  * require the creation of a TOAST table, but the old database might
4718  * have a TOAST table that was created earlier, before some wide
4719  * columns were dropped. By setting the TOAST oid we force creation
4720  * of the TOAST heap and index by the new backend, so we can copy the
4721  * files during binary upgrade without worrying about this case.
4722  */
4723  appendPQExpBuffer(upgrade_query,
4724  "SELECT c.reltoastrelid, c.relkind, i.indexrelid "
4725  "FROM pg_catalog.pg_class c LEFT JOIN "
4726  "pg_catalog.pg_index i ON (c.reltoastrelid = i.indrelid AND i.indisvalid) "
4727  "WHERE c.oid = '%u'::pg_catalog.oid;",
4728  pg_class_oid);
4729 
4730  upgrade_res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4731 
4732  pg_class_reltoastrelid = atooid(PQgetvalue(upgrade_res, 0,
4733  PQfnumber(upgrade_res, "reltoastrelid")));
4734  pg_class_relkind = *PQgetvalue(upgrade_res, 0,
4735  PQfnumber(upgrade_res, "relkind"));
4736  pg_index_indexrelid = atooid(PQgetvalue(upgrade_res, 0,
4737  PQfnumber(upgrade_res, "indexrelid")));
4738 
4739  /*
4740  * In a pre-v12 database, partitioned tables might be marked as having
4741  * toast tables, but we should ignore them if so.
4742  */
4743  if (OidIsValid(pg_class_reltoastrelid) &&
4744  pg_class_relkind != RELKIND_PARTITIONED_TABLE)
4745  {
4746  appendPQExpBuffer(upgrade_buffer,
4747  "SELECT pg_catalog.binary_upgrade_set_next_toast_pg_class_oid('%u'::pg_catalog.oid);\n",
4748  pg_class_reltoastrelid);
4749 
4750  /* every toast table has an index */
4751  appendPQExpBuffer(upgrade_buffer,
4752  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4753  pg_index_indexrelid);
4754  }
4755 
4756  PQclear(upgrade_res);
4757  destroyPQExpBuffer(upgrade_query);
4758  }
4759  else
4760  appendPQExpBuffer(upgrade_buffer,
4761  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4762  pg_class_oid);
4763 
4764  appendPQExpBufferChar(upgrade_buffer, '\n');
4765 }
4766 
4767 /*
4768  * If the DumpableObject is a member of an extension, add a suitable
4769  * ALTER EXTENSION ADD command to the creation commands in upgrade_buffer.
4770  *
4771  * For somewhat historical reasons, objname should already be quoted,
4772  * but not objnamespace (if any).
4773  */
4774 static void
4776  const DumpableObject *dobj,
4777  const char *objtype,
4778  const char *objname,
4779  const char *objnamespace)
4780 {
4781  DumpableObject *extobj = NULL;
4782  int i;
4783 
4784  if (!dobj->ext_member)
4785  return;
4786 
4787  /*
4788  * Find the parent extension. We could avoid this search if we wanted to
4789  * add a link field to DumpableObject, but the space costs of that would
4790  * be considerable. We assume that member objects could only have a
4791  * direct dependency on their own extension, not any others.
4792  */
4793  for (i = 0; i < dobj->nDeps; i++)
4794  {
4795  extobj = findObjectByDumpId(dobj->dependencies[i]);
4796  if (extobj && extobj->objType == DO_EXTENSION)
4797  break;
4798  extobj = NULL;
4799  }
4800  if (extobj == NULL)
4801  fatal("could not find parent extension for %s %s",
4802  objtype, objname);
4803 
4804  appendPQExpBufferStr(upgrade_buffer,
4805  "\n-- For binary upgrade, handle extension membership the hard way\n");
4806  appendPQExpBuffer(upgrade_buffer, "ALTER EXTENSION %s ADD %s ",
4807  fmtId(extobj->name),
4808  objtype);
4809  if (objnamespace && *objnamespace)
4810  appendPQExpBuffer(upgrade_buffer, "%s.", fmtId(objnamespace));
4811  appendPQExpBuffer(upgrade_buffer, "%s;\n", objname);
4812 }
4813 
4814 /*
4815  * getNamespaces:
4816  * read all namespaces in the system catalogs and return them in the
4817  * NamespaceInfo* structure
4818  *
4819  * numNamespaces is set to the number of namespaces read in
4820  */
4821 NamespaceInfo *
4823 {
4824  DumpOptions *dopt = fout->dopt;
4825  PGresult *res;
4826  int ntups;
4827  int i;
4828  PQExpBuffer query;
4829  NamespaceInfo *nsinfo;
4830  int i_tableoid;
4831  int i_oid;
4832  int i_nspname;
4833  int i_nspowner;
4834  int i_rolname;
4835  int i_nspacl;
4836  int i_rnspacl;
4837  int i_initnspacl;
4838  int i_initrnspacl;
4839 
4840  query = createPQExpBuffer();
4841 
4842  /*
4843  * we fetch all namespaces including system ones, so that every object we
4844  * read in can be linked to a containing namespace.
4845  */
4846  if (fout->remoteVersion >= 90600)
4847  {
4848  PQExpBuffer acl_subquery = createPQExpBuffer();
4849  PQExpBuffer racl_subquery = createPQExpBuffer();
4850  PQExpBuffer init_acl_subquery = createPQExpBuffer();
4851  PQExpBuffer init_racl_subquery = createPQExpBuffer();
4852 
4853  /*
4854  * Bypass pg_init_privs.initprivs for the public schema, for several
4855  * reasons. First, dropping and recreating the schema detaches it
4856  * from its pg_init_privs row, but an empty destination database
4857  * starts with this ACL nonetheless. Second, we support dump/reload
4858  * of public schema ownership changes. ALTER SCHEMA OWNER filters
4859  * nspacl through aclnewowner(), but initprivs continues to reflect
4860  * the initial owner. Hence, synthesize the value that nspacl will
4861  * have after the restore's ALTER SCHEMA OWNER. Third, this makes the
4862  * destination database match the source's ACL, even if the latter was
4863  * an initdb-default ACL, which changed in v15. An upgrade pulls in
4864  * changes to most system object ACLs that the DBA had not customized.
4865  * We've made the public schema depart from that, because changing its
4866  * ACL so easily breaks applications.
4867  */
4868  buildACLQueries(acl_subquery, racl_subquery, init_acl_subquery,
4869  init_racl_subquery, "n.nspacl", "n.nspowner",
4870  "CASE WHEN n.nspname = 'public' THEN array["
4871  " format('%s=UC/%s', "
4872  " n.nspowner::regrole, n.nspowner::regrole),"
4873  " format('=U/%s', n.nspowner::regrole)]::aclitem[] "
4874  "ELSE pip.initprivs END",
4875  "'n'", dopt->binary_upgrade);
4876 
4877  appendPQExpBuffer(query, "SELECT n.tableoid, n.oid, n.nspname, "
4878  "n.nspowner, "
4879  "(%s nspowner) AS rolname, "
4880  "%s as nspacl, "
4881  "%s as rnspacl, "
4882  "%s as initnspacl, "
4883  "%s as initrnspacl "
4884  "FROM pg_namespace n "
4885  "LEFT JOIN pg_init_privs pip "
4886  "ON (n.oid = pip.objoid "
4887  "AND pip.classoid = 'pg_namespace'::regclass "
4888  "AND pip.objsubid = 0",
4890  acl_subquery->data,
4891  racl_subquery->data,
4892  init_acl_subquery->data,
4893  init_racl_subquery->data);
4894 
4895  appendPQExpBufferStr(query, ") ");
4896 
4897  destroyPQExpBuffer(acl_subquery);
4898  destroyPQExpBuffer(racl_subquery);
4899  destroyPQExpBuffer(init_acl_subquery);
4900  destroyPQExpBuffer(init_racl_subquery);
4901  }
4902  else
4903  appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, nspowner, "
4904  "(%s nspowner) AS rolname, "
4905  "nspacl, NULL as rnspacl, "
4906  "NULL AS initnspacl, NULL as initrnspacl "
4907  "FROM pg_namespace",
4909 
4910  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4911 
4912  ntups = PQntuples(res);
4913 
4914  nsinfo = (NamespaceInfo *) pg_malloc(ntups * sizeof(NamespaceInfo));
4915 
4916  i_tableoid = PQfnumber(res, "tableoid");
4917  i_oid = PQfnumber(res, "oid");
4918  i_nspname = PQfnumber(res, "nspname");
4919  i_nspowner = PQfnumber(res, "nspowner");
4920  i_rolname = PQfnumber(res, "rolname");
4921  i_nspacl = PQfnumber(res, "nspacl");
4922  i_rnspacl = PQfnumber(res, "rnspacl");
4923  i_initnspacl = PQfnumber(res, "initnspacl");
4924  i_initrnspacl = PQfnumber(res, "initrnspacl");
4925 
4926  for (i = 0; i < ntups; i++)
4927  {
4928  nsinfo[i].dobj.objType = DO_NAMESPACE;
4929  nsinfo[i].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid));
4930  nsinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4931  AssignDumpId(&nsinfo[i].dobj);
4932  nsinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_nspname));
4933  nsinfo[i].nspowner = atooid(PQgetvalue(res, i, i_nspowner));
4934  nsinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
4935  nsinfo[i].nspacl = pg_strdup(PQgetvalue(res, i, i_nspacl));
4936  nsinfo[i].rnspacl = pg_strdup(PQgetvalue(res, i, i_rnspacl));
4937  nsinfo[i].initnspacl = pg_strdup(PQgetvalue(res, i, i_initnspacl));
4938  nsinfo[i].initrnspacl = pg_strdup(PQgetvalue(res, i, i_initrnspacl));
4939 
4940  /* Decide whether to dump this namespace */
4941  selectDumpableNamespace(&nsinfo[i], fout);
4942 
4943  /*
4944  * Do not try to dump ACL if the ACL is empty or the default.
4945  *
4946  * This is useful because, for some schemas/objects, the only
4947  * component we are going to try and dump is the ACL and if we can
4948  * remove that then 'dump' goes to zero/false and we don't consider
4949  * this object for dumping at all later on.
4950  */
4951  if (PQgetisnull(res, i, i_nspacl) && PQgetisnull(res, i, i_rnspacl) &&
4952  PQgetisnull(res, i, i_initnspacl) &&
4953  PQgetisnull(res, i, i_initrnspacl))
4954  nsinfo[i].dobj.dump &= ~DUMP_COMPONENT_ACL;
4955 
4956  if (strlen(nsinfo[i].rolname) == 0)
4957  pg_log_warning("owner of schema \"%s\" appears to be invalid",
4958  nsinfo[i].dobj.name);
4959  }
4960 
4961  PQclear(res);
4962  destroyPQExpBuffer(query);
4963 
4964  *numNamespaces = ntups;
4965 
4966  return nsinfo;
4967 }
4968 
4969 /*
4970  * findNamespace:
4971  * given a namespace OID, look up the info read by getNamespaces
4972  */
4973 static NamespaceInfo *
4975 {
4976  NamespaceInfo *nsinfo;
4977 
4978  nsinfo = findNamespaceByOid(nsoid);
4979  if (nsinfo == NULL)
4980  fatal("schema with OID %u does not exist", nsoid);
4981  return nsinfo;
4982 }
4983 
4984 /*
4985  * getExtensions:
4986  * read all extensions in the system catalogs and return them in the
4987  * ExtensionInfo* structure
4988  *
4989  * numExtensions is set to the number of extensions read in
4990  */
4991 ExtensionInfo *
4993 {
4994  DumpOptions *dopt = fout->dopt;
4995  PGresult *res;
4996  int ntups;
4997  int i;
4998  PQExpBuffer query;
4999  ExtensionInfo *extinfo;
5000  int i_tableoid;
5001  int i_oid;
5002  int i_extname;
5003  int i_nspname;
5004  int i_extrelocatable;
5005  int i_extversion;
5006  int i_extconfig;
5007  int i_extcondition;
5008 
5009  /*
5010  * Before 9.1, there are no extensions.
5011  */
5012  if (fout->remoteVersion < 90100)
5013  {
5014  *numExtensions = 0;
5015  return NULL;
5016  }
5017 
5018  query = createPQExpBuffer();
5019 
5020  appendPQExpBufferStr(query, "SELECT x.tableoid, x.oid, "
5021  "x.extname, n.nspname, x.extrelocatable, x.extversion, x.extconfig, x.extcondition "
5022  "FROM pg_extension x "
5023  "JOIN pg_namespace n ON n.oid = x.extnamespace");
5024 
5025  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
5026 
5027  ntups = PQntuples(res);
5028 
5029  extinfo = (ExtensionInfo *) pg_malloc(ntups * sizeof(ExtensionInfo));
5030 
5031  i_tableoid = PQfnumber(res, "tableoid");
5032  i_oid = PQfnumber(res, "oid");
5033  i_extname = PQfnumber(res, "extname");
5034  i_nspname = PQfnumber(res, "nspname");
5035  i_extrelocatable = PQfnumber(res, "extrelocatable");
5036  i_extversion = PQfnumber(res, "extversion");
5037  i_extconfig = PQfnumber(res, "extconfig");
5038  i_extcondition = PQfnumber(res, "extcondition");
5039 
5040  for (i = 0; i < ntups; i++)
5041  {
5042  extinfo[i].dobj.objType = DO_EXTENSION;
5043  extinfo[i].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid));
5044  extinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
5045  AssignDumpId(&extinfo[i].dobj);
5046  extinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_extname));
5047  extinfo[i].namespace = pg_strdup(PQgetvalue(res, i, i_nspname));
5048  extinfo[i].relocatable = *(PQgetvalue(res, i, i_extrelocatable)) == 't';
5049  extinfo[i].extversion = pg_strdup(PQgetvalue(res, i, i_extversion));
5050  extinfo[i].extconfig = pg_strdup(PQgetvalue(res, i, i_extconfig));
5051  extinfo[i].extcondition = pg_strdup(PQgetvalue(res, i, i_extcondition));
5052 
5053  /* Decide whether we want to dump it */
5054  selectDumpableExtension(&(extinfo[i]), dopt);
5055  }
5056 
5057  PQclear(res);
5058  destroyPQExpBuffer(query);
5059 
5060  *numExtensions = ntups;
5061 
5062  return extinfo;
5063 }
5064 
5065 /*
5066  * getTypes:
5067  * read all types in the system catalogs and return them in the
5068  * TypeInfo* structure
5069  *
5070  * numTypes is set to the number of types read in
5071  *
5072  * NB: this must run after getFuncs() because we assume we can do
5073  * findFuncByOid().
5074  */
5075 TypeInfo *
5077 {
5078  DumpOptions *dopt = fout->dopt;
5079  PGresult *res;
5080  int ntups;
5081  int i;
5082  PQExpBuffer query = createPQExpBuffer();
5083  TypeInfo *tyinfo;
5084  ShellTypeInfo *stinfo;
5085  int i_tableoid;
5086  int i_oid;
5087  int i_typname;
5088  int i_typnamespace;
5089  int i_typacl;
5090  int i_rtypacl;
5091  int i_inittypacl;
5092  int i_initrtypacl;
5093  int i_rolname;
5094  int i_typelem;
5095  int i_typrelid;
5096  int i_typrelkind;
5097  int i_typtype;
5098  int i_typisdefined;
5099  int i_isarray;
5100 
5101  /*
5102  * we include even the built-in types because those may be used as array
5103  * elements by user-defined types
5104  *
5105  * we filter out the built-in types when we dump out the types
5106  *
5107  * same approach for undefined (shell) types and array types
5108  *
5109  * Note: as of 8.3 we can reliably detect whether a type is an
5110  * auto-generated array type by checking the element type's typarray.
5111  * (Before that the test is capable of generating false positives.) We
5112  * still check for name beginning with '_', though, so as to avoid the
5113  * cost of the subselect probe for all standard types. This would have to
5114  * be revisited if the backend ever allows renaming of array types.
5115  */
5116 
5117  if (fout->remoteVersion >= 90600)
5118  {
5119  PQExpBuffer acl_subquery = createPQExpBuffer();
5120  PQExpBuffer racl_subquery = createPQExpBuffer();
5121  PQExpBuffer initacl_subquery = createPQExpBuffer();
5122  PQExpBuffer initracl_subquery = createPQExpBuffer();
5123 
5124  buildACLQueries(acl_subquery, racl_subquery, initacl_subquery,
5125  initracl_subquery, "t.typacl", "t.typowner",
5126  "pip.initprivs", "'T'", dopt->binary_upgrade);
5127 
5128  appendPQExpBuffer(query, "SELECT t.tableoid, t.oid, t.typname, "
5129  "t.typnamespace, "
5130  "%s AS typacl, "
5131  "%s AS rtypacl, "
5132  "%s AS inittypacl, "
5133  "%s AS initrtypacl, "
5134  "(%s t.typowner) AS rolname, "
5135  "t.typelem, t.typrelid, "
5136  "CASE WHEN t.typrelid = 0 THEN ' '::\"char\" "
5137  "ELSE (SELECT relkind FROM pg_class WHERE oid = t.typrelid) END AS typrelkind, "
5138  "t.typtype, t.typisdefined, "
5139  "t.typname[0] = '_' AND t.typelem != 0 AND "
5140  "(SELECT typarray FROM pg_type te WHERE oid = t.typelem) = t.oid AS isarray "
5141  "FROM pg_type t "
5142  "LEFT JOIN pg_init_privs pip ON "
5143  "(t.oid = pip.objoid "
5144  "AND pip.classoid = 'pg_type'::regclass "
5145  "AND pip.objsubid = 0) ",
5146  acl_subquery->data,
5147  racl_subquery->data,
5148  initacl_subquery->data,
5149  initracl_subquery->data,
5151 
5152  destroyPQExpBuffer(acl_subquery);
5153  destroyPQExpBuffer(racl_subquery);
5154  destroyPQExpBuffer(initacl_subquery);
5155  destroyPQExpBuffer(initracl_subquery);
5156  }
5157  else if (fout->remoteVersion >= 90200)
5158  {
5159  appendPQExpBuffer(query, "SELECT tableoid, oid, typname, "
5160  "typnamespace, typacl, NULL as rtypacl, "
5161  "NULL AS inittypacl, NULL AS initrtypacl, "
5162  "(%s typowner) AS rolname, "
5163  "typelem, typrelid, "
5164  "CASE WHEN typrelid = 0 THEN ' '::\"char\" "
5165  "ELSE (SELECT relkind FROM pg_class WHERE oid = typrelid) END AS typrelkind, "
5166  "typtype, typisdefined, "
5167  "typname[0] = '_' AND typelem != 0 AND "
5168  "(SELECT typarray FROM pg_type te WHERE oid = pg_type.typelem) = oid AS isarray "
5169  "FROM pg_type",
5171  }
5172  else if (fout->remoteVersion >= 80300)
5173  {
5174  appendPQExpBuffer(query, "SELECT tableoid, oid, typname, "
5175  "typnamespace, NULL AS typacl, NULL as rtypacl, "
5176  "NULL AS inittypacl, NULL AS initrtypacl, "
5177  "(%s typowner) AS rolname, "
5178  "typelem, typrelid, "
5179  "CASE WHEN typrelid = 0 THEN ' '::\"char\" "
5180  "ELSE (SELECT relkind FROM pg_class WHERE oid = typrelid) END AS typrelkind, "
5181  "typtype, typisdefined, "
5182  "typname[0] = '_' AND typelem != 0 AND "
5183  "(SELECT typarray FROM pg_type te WHERE oid = pg_type.typelem) = oid AS isarray "
5184  "FROM pg_type",
5186  }
5187  else
5188  {
5189  appendPQExpBuffer(query, "SELECT tableoid, oid, typname, "
5190  "typnamespace, NULL AS typacl, NULL as rtypacl, "
5191  "NULL AS inittypacl, NULL AS initrtypacl, "
5192  "(%s typowner) AS rolname, "
5193  "typelem, typrelid, "
5194  "CASE WHEN typrelid = 0 THEN ' '::\"char\" "
5195  "ELSE (SELECT relkind FROM pg_class WHERE oid = typrelid) END AS typrelkind, "
5196  "typtype, typisdefined, "
5197  "typname[0] = '_' AND typelem != 0 AS isarray "
5198  "FROM pg_type",
5200  }
5201 
5202  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
5203 
5204  ntups = PQntuples(res);
5205 
5206  tyinfo = (TypeInfo *) pg_malloc(ntups * sizeof(TypeInfo));
5207 
5208  i_tableoid = PQfnumber(res, "tableoid");
5209  i_oid = PQfnumber(res, "oid");
5210  i_typname = PQfnumber(res, "typname");
5211  i_typnamespace = PQfnumber(res, "typnamespace");
5212  i_typacl = PQfnumber(res, "typacl");
5213  i_rtypacl = PQfnumber(res, "rtypacl");
5214  i_inittypacl = PQfnumber(res, "inittypacl");
5215  i_initrtypacl = PQfnumber(res, "initrtypacl");
5216  i_rolname = PQfnumber(res, "rolname");
5217  i_typelem = PQfnumber(res, "typelem");
5218  i_typrelid = PQfnumber(res, "typrelid");
5219  i_typrelkind = PQfnumber(res, "typrelkind");
5220  i_typtype = PQfnumber(res, "typtype");
5221  i_typisdefined = PQfnumber(res, "typisdefined");
5222  i_isarray = PQfnumber(res, "isarray");
5223 
5224  for (i = 0; i < ntups; i++)
5225  {
5226  tyinfo[i].dobj.objType = DO_TYPE;
5227  tyinfo[i].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid));
5228  tyinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
5229  AssignDumpId(&tyinfo[i].dobj);
5230  tyinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_typname));
5231  tyinfo[i].dobj.namespace =
5232  findNamespace(atooid(PQgetvalue(res, i, i_typnamespace)));
5233  tyinfo[i].ftypname = NULL; /* may get filled later */
5234  tyinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
5235  tyinfo[i].typacl = pg_strdup(PQgetvalue(res, i, i_typacl));
5236  tyinfo[i].rtypacl = pg_strdup(PQgetvalue(res, i, i_rtypacl));
5237  tyinfo[i].inittypacl = pg_strdup(PQgetvalue(res, i, i_inittypacl));
5238  tyinfo[i].initrtypacl = pg_strdup(PQgetvalue(res, i, i_initrtypacl));
5239  tyinfo[i].typelem = atooid(PQgetvalue(res, i, i_typelem));
5240  tyinfo[i].typrelid = atooid(PQgetvalue(res, i, i_typrelid));
5241  tyinfo[i].typrelkind = *PQgetvalue(res, i, i_typrelkind);
5242  tyinfo[i].typtype = *PQgetvalue(res, i, i_typtype);
5243  tyinfo[i].shellType = NULL;
5244 
5245  if (strcmp(PQgetvalue(res, i, i_typisdefined), "t") == 0)
5246  tyinfo[i].isDefined = true;
5247  else
5248  tyinfo[i].isDefined = false;
5249 
5250  if (strcmp(PQgetvalue(res, i, i_isarray), "t") == 0)
5251  tyinfo[i].isArray = true;
5252  else
5253  tyinfo[i].isArray = false;
5254 
5255  if (tyinfo[i].typtype == 'm')
5256  tyinfo[i].isMultirange = true;
5257  else
5258  tyinfo[i].isMultirange = false;
5259 
5260  /* Decide whether we want to dump it */
5261  selectDumpableType(&tyinfo[i], fout);
5262 
5263  /* Do not try to dump ACL if no ACL exists. */
5264  if (PQgetisnull(res, i, i_typacl) && PQgetisnull(res, i, i_rtypacl) &&
5265  PQgetisnull(res, i, i_inittypacl) &&
5266  PQgetisnull(res, i, i_initrtypacl))
5267  tyinfo[i].dobj.dump &= ~DUMP_COMPONEN