PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "dumputils.h"
59 #include "fe_utils/option_utils.h"
60 #include "fe_utils/string_utils.h"
61 #include "getopt_long.h"
62 #include "libpq/libpq-fs.h"
63 #include "parallel.h"
64 #include "pg_backup_db.h"
65 #include "pg_backup_utils.h"
66 #include "pg_dump.h"
67 #include "storage/block.h"
68 
69 typedef struct
70 {
71  const char *descr; /* comment for an object */
72  Oid classoid; /* object class (catalog OID) */
73  Oid objoid; /* object OID */
74  int objsubid; /* subobject (table column #) */
75 } CommentItem;
76 
77 typedef struct
78 {
79  const char *provider; /* label provider of this security label */
80  const char *label; /* security label for an object */
81  Oid classoid; /* object class (catalog OID) */
82  Oid objoid; /* object OID */
83  int objsubid; /* subobject (table column #) */
84 } SecLabelItem;
85 
86 typedef enum OidOptions
87 {
90  zeroAsNone = 4
92 
93 /* global decls */
94 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
95 
96 /* subquery used to convert user ID (eg, datdba) to user name */
97 static const char *username_subquery;
98 
99 /*
100  * For 8.0 and earlier servers, pulled from pg_database, for 8.1+ we use
101  * FirstNormalObjectId - 1.
102  */
103 static Oid g_last_builtin_oid; /* value of the last builtin oid */
104 
105 /* The specified names/patterns should to match at least one entity */
106 static int strict_names = 0;
107 
108 /*
109  * Object inclusion/exclusion lists
110  *
111  * The string lists record the patterns given by command-line switches,
112  * which we then convert to lists of OIDs of matching objects.
113  */
115 static SimpleOidList schema_include_oids = {NULL, NULL};
117 static SimpleOidList schema_exclude_oids = {NULL, NULL};
118 
120 static SimpleOidList table_include_oids = {NULL, NULL};
122 static SimpleOidList table_exclude_oids = {NULL, NULL};
124 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
127 
129 static SimpleOidList extension_include_oids = {NULL, NULL};
130 
131 static const CatalogId nilCatalogId = {0, 0};
132 
133 /* override for standard extra_float_digits setting */
134 static bool have_extra_float_digits = false;
136 
137 /* sorted table of comments */
138 static CommentItem *comments = NULL;
139 static int ncomments = 0;
140 
141 /* sorted table of security labels */
142 static SecLabelItem *seclabels = NULL;
143 static int nseclabels = 0;
144 
145 /*
146  * The default number of rows per INSERT when
147  * --inserts is specified without --rows-per-insert
148  */
149 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
150 
151 /*
152  * Macro for producing quoted, schema-qualified name of a dumpable object.
153  */
154 #define fmtQualifiedDumpable(obj) \
155  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
156  (obj)->dobj.name)
157 
158 static void help(const char *progname);
159 static void setup_connection(Archive *AH,
160  const char *dumpencoding, const char *dumpsnapshot,
161  char *use_role);
163 static void expand_schema_name_patterns(Archive *fout,
164  SimpleStringList *patterns,
165  SimpleOidList *oids,
166  bool strict_names);
167 static void expand_extension_name_patterns(Archive *fout,
168  SimpleStringList *patterns,
169  SimpleOidList *oids,
170  bool strict_names);
172  SimpleStringList *patterns,
173  SimpleOidList *oids);
174 static void expand_table_name_patterns(Archive *fout,
175  SimpleStringList *patterns,
176  SimpleOidList *oids,
177  bool strict_names);
178 static NamespaceInfo *findNamespace(Oid nsoid);
179 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
180 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
181 static void guessConstraintInheritance(TableInfo *tblinfo, int numTables);
182 static void getAdditionalACLs(Archive *fout);
183 static void dumpCommentExtended(Archive *fout, const char *type,
184  const char *name, const char *namespace,
185  const char *owner, CatalogId catalogId,
186  int subid, DumpId dumpId,
187  const char *initdb_comment);
188 static inline void dumpComment(Archive *fout, const char *type,
189  const char *name, const char *namespace,
190  const char *owner, CatalogId catalogId,
191  int subid, DumpId dumpId);
192 static int findComments(Archive *fout, Oid classoid, Oid objoid,
193  CommentItem **items);
194 static void collectComments(Archive *fout);
195 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
196  const char *namespace, const char *owner,
197  CatalogId catalogId, int subid, DumpId dumpId);
198 static int findSecLabels(Archive *fout, Oid classoid, Oid objoid,
199  SecLabelItem **items);
200 static void collectSecLabels(Archive *fout);
201 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
202 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
203 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
204 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
205 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
206 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
207 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
208 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
209 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
210 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
211 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo);
212 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
213 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
214 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
215 static void dumpCast(Archive *fout, const CastInfo *cast);
216 static void dumpTransform(Archive *fout, const TransformInfo *transform);
217 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
218 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *oprinfo);
219 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
220 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
221 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
222 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
223 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
224 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
225 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
226 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
227 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
228 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
229 static void dumpTableAttach(Archive *fout, const TableAttachInfo *tbinfo);
230 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
231 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
232 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
233 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
234 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
235 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
236 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
237 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
238 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
239 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
240 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
241 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
242 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
243 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
244 static void dumpUserMappings(Archive *fout,
245  const char *servername, const char *namespace,
246  const char *owner, CatalogId catalogId, DumpId dumpId);
247 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
248 
249 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
250  const char *type, const char *name, const char *subname,
251  const char *nspname, const char *owner,
252  const DumpableAcl *dacl);
253 
254 static void getDependencies(Archive *fout);
255 static void BuildArchiveDependencies(Archive *fout);
256 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
257  DumpId **dependencies, int *nDeps, int *allocDeps);
258 
260 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
261  DumpableObject *boundaryObjs);
262 
263 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
264 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
265 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
266 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
267 static void buildMatViewRefreshDependencies(Archive *fout);
268 static void getTableDataFKConstraints(void);
269 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
270  bool is_agg);
271 static char *format_function_arguments_old(Archive *fout,
272  const FuncInfo *finfo, int nallargs,
273  char **allargtypes,
274  char **argmodes,
275  char **argnames);
276 static char *format_function_signature(Archive *fout,
277  const FuncInfo *finfo, bool honor_quotes);
278 static char *convertRegProcReference(const char *proc);
279 static char *getFormattedOperatorName(const char *oproid);
280 static char *convertTSFunction(Archive *fout, Oid funcOid);
281 static Oid findLastBuiltinOid_V71(Archive *fout);
282 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
283 static void getBlobs(Archive *fout);
284 static void dumpBlob(Archive *fout, const BlobInfo *binfo);
285 static int dumpBlobs(Archive *fout, const void *arg);
286 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
287 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
288 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
289 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
290 static void dumpDatabase(Archive *AH);
291 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
292  const char *dbname, Oid dboid);
293 static void dumpEncoding(Archive *AH);
294 static void dumpStdStrings(Archive *AH);
295 static void dumpSearchPath(Archive *AH);
297  PQExpBuffer upgrade_buffer,
298  Oid pg_type_oid,
299  bool force_array_type,
300  bool include_multirange_type);
302  PQExpBuffer upgrade_buffer,
303  const TableInfo *tbinfo);
304 static void binary_upgrade_set_pg_class_oids(Archive *fout,
305  PQExpBuffer upgrade_buffer,
306  Oid pg_class_oid, bool is_index);
307 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
308  const DumpableObject *dobj,
309  const char *objtype,
310  const char *objname,
311  const char *objnamespace);
312 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
313 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
314 static bool nonemptyReloptions(const char *reloptions);
315 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
316  const char *prefix, Archive *fout);
317 static char *get_synchronized_snapshot(Archive *fout);
318 static void setupDumpWorker(Archive *AHX);
319 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
320 
321 
322 int
323 main(int argc, char **argv)
324 {
325  int c;
326  const char *filename = NULL;
327  const char *format = "p";
328  TableInfo *tblinfo;
329  int numTables;
330  DumpableObject **dobjs;
331  int numObjs;
332  DumpableObject *boundaryObjs;
333  int i;
334  int optindex;
335  RestoreOptions *ropt;
336  Archive *fout; /* the script file */
337  bool g_verbose = false;
338  const char *dumpencoding = NULL;
339  const char *dumpsnapshot = NULL;
340  char *use_role = NULL;
341  int numWorkers = 1;
342  int compressLevel = -1;
343  int plainText = 0;
344  ArchiveFormat archiveFormat = archUnknown;
345  ArchiveMode archiveMode;
346 
347  static DumpOptions dopt;
348 
349  static struct option long_options[] = {
350  {"data-only", no_argument, NULL, 'a'},
351  {"blobs", no_argument, NULL, 'b'},
352  {"no-blobs", no_argument, NULL, 'B'},
353  {"clean", no_argument, NULL, 'c'},
354  {"create", no_argument, NULL, 'C'},
355  {"dbname", required_argument, NULL, 'd'},
356  {"extension", required_argument, NULL, 'e'},
357  {"file", required_argument, NULL, 'f'},
358  {"format", required_argument, NULL, 'F'},
359  {"host", required_argument, NULL, 'h'},
360  {"jobs", 1, NULL, 'j'},
361  {"no-reconnect", no_argument, NULL, 'R'},
362  {"no-owner", no_argument, NULL, 'O'},
363  {"port", required_argument, NULL, 'p'},
364  {"schema", required_argument, NULL, 'n'},
365  {"exclude-schema", required_argument, NULL, 'N'},
366  {"schema-only", no_argument, NULL, 's'},
367  {"superuser", required_argument, NULL, 'S'},
368  {"table", required_argument, NULL, 't'},
369  {"exclude-table", required_argument, NULL, 'T'},
370  {"no-password", no_argument, NULL, 'w'},
371  {"password", no_argument, NULL, 'W'},
372  {"username", required_argument, NULL, 'U'},
373  {"verbose", no_argument, NULL, 'v'},
374  {"no-privileges", no_argument, NULL, 'x'},
375  {"no-acl", no_argument, NULL, 'x'},
376  {"compress", required_argument, NULL, 'Z'},
377  {"encoding", required_argument, NULL, 'E'},
378  {"help", no_argument, NULL, '?'},
379  {"version", no_argument, NULL, 'V'},
380 
381  /*
382  * the following options don't have an equivalent short option letter
383  */
384  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
385  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
386  {"column-inserts", no_argument, &dopt.column_inserts, 1},
387  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
388  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
389  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
390  {"exclude-table-data", required_argument, NULL, 4},
391  {"extra-float-digits", required_argument, NULL, 8},
392  {"if-exists", no_argument, &dopt.if_exists, 1},
393  {"inserts", no_argument, NULL, 9},
394  {"lock-wait-timeout", required_argument, NULL, 2},
395  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
396  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
397  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
398  {"role", required_argument, NULL, 3},
399  {"section", required_argument, NULL, 5},
400  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
401  {"snapshot", required_argument, NULL, 6},
402  {"strict-names", no_argument, &strict_names, 1},
403  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
404  {"no-comments", no_argument, &dopt.no_comments, 1},
405  {"no-publications", no_argument, &dopt.no_publications, 1},
406  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
407  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
408  {"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1},
409  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
410  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
411  {"no-sync", no_argument, NULL, 7},
412  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
413  {"rows-per-insert", required_argument, NULL, 10},
414  {"include-foreign-data", required_argument, NULL, 11},
415 
416  {NULL, 0, NULL, 0}
417  };
418 
419  pg_logging_init(argv[0]);
421  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
422 
423  /*
424  * Initialize what we need for parallel execution, especially for thread
425  * support on Windows.
426  */
428 
429  progname = get_progname(argv[0]);
430 
431  if (argc > 1)
432  {
433  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
434  {
435  help(progname);
436  exit_nicely(0);
437  }
438  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
439  {
440  puts("pg_dump (PostgreSQL) " PG_VERSION);
441  exit_nicely(0);
442  }
443  }
444 
445  InitDumpOptions(&dopt);
446 
447  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
448  long_options, &optindex)) != -1)
449  {
450  switch (c)
451  {
452  case 'a': /* Dump data only */
453  dopt.dataOnly = true;
454  break;
455 
456  case 'b': /* Dump blobs */
457  dopt.outputBlobs = true;
458  break;
459 
460  case 'B': /* Don't dump blobs */
461  dopt.dontOutputBlobs = true;
462  break;
463 
464  case 'c': /* clean (i.e., drop) schema prior to create */
465  dopt.outputClean = 1;
466  break;
467 
468  case 'C': /* Create DB */
469  dopt.outputCreateDB = 1;
470  break;
471 
472  case 'd': /* database name */
473  dopt.cparams.dbname = pg_strdup(optarg);
474  break;
475 
476  case 'e': /* include extension(s) */
478  dopt.include_everything = false;
479  break;
480 
481  case 'E': /* Dump encoding */
482  dumpencoding = pg_strdup(optarg);
483  break;
484 
485  case 'f':
487  break;
488 
489  case 'F':
491  break;
492 
493  case 'h': /* server host */
494  dopt.cparams.pghost = pg_strdup(optarg);
495  break;
496 
497  case 'j': /* number of dump jobs */
498  if (!option_parse_int(optarg, "-j/--jobs", 1,
499  PG_MAX_JOBS,
500  &numWorkers))
501  exit_nicely(1);
502  break;
503 
504  case 'n': /* include schema(s) */
506  dopt.include_everything = false;
507  break;
508 
509  case 'N': /* exclude schema(s) */
511  break;
512 
513  case 'O': /* Don't reconnect to match owner */
514  dopt.outputNoOwner = 1;
515  break;
516 
517  case 'p': /* server port */
518  dopt.cparams.pgport = pg_strdup(optarg);
519  break;
520 
521  case 'R':
522  /* no-op, still accepted for backwards compatibility */
523  break;
524 
525  case 's': /* dump schema only */
526  dopt.schemaOnly = true;
527  break;
528 
529  case 'S': /* Username for superuser in plain text output */
531  break;
532 
533  case 't': /* include table(s) */
535  dopt.include_everything = false;
536  break;
537 
538  case 'T': /* exclude table(s) */
540  break;
541 
542  case 'U':
544  break;
545 
546  case 'v': /* verbose */
547  g_verbose = true;
549  break;
550 
551  case 'w':
553  break;
554 
555  case 'W':
557  break;
558 
559  case 'x': /* skip ACL dump */
560  dopt.aclsSkip = true;
561  break;
562 
563  case 'Z': /* Compression Level */
564  if (!option_parse_int(optarg, "-Z/--compress", 0, 9,
565  &compressLevel))
566  exit_nicely(1);
567  break;
568 
569  case 0:
570  /* This covers the long options. */
571  break;
572 
573  case 2: /* lock-wait-timeout */
575  break;
576 
577  case 3: /* SET ROLE */
578  use_role = pg_strdup(optarg);
579  break;
580 
581  case 4: /* exclude table(s) data */
583  break;
584 
585  case 5: /* section */
587  break;
588 
589  case 6: /* snapshot */
590  dumpsnapshot = pg_strdup(optarg);
591  break;
592 
593  case 7: /* no-sync */
594  dosync = false;
595  break;
596 
597  case 8:
599  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
601  exit_nicely(1);
602  break;
603 
604  case 9: /* inserts */
605 
606  /*
607  * dump_inserts also stores --rows-per-insert, careful not to
608  * overwrite that.
609  */
610  if (dopt.dump_inserts == 0)
612  break;
613 
614  case 10: /* rows per insert */
615  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
616  &dopt.dump_inserts))
617  exit_nicely(1);
618  break;
619 
620  case 11: /* include foreign data */
622  optarg);
623  break;
624 
625  default:
626  fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
627  exit_nicely(1);
628  }
629  }
630 
631  /*
632  * Non-option argument specifies database name as long as it wasn't
633  * already specified with -d / --dbname
634  */
635  if (optind < argc && dopt.cparams.dbname == NULL)
636  dopt.cparams.dbname = argv[optind++];
637 
638  /* Complain if any arguments remain */
639  if (optind < argc)
640  {
641  pg_log_error("too many command-line arguments (first is \"%s\")",
642  argv[optind]);
643  fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
644  progname);
645  exit_nicely(1);
646  }
647 
648  /* --column-inserts implies --inserts */
649  if (dopt.column_inserts && dopt.dump_inserts == 0)
651 
652  /*
653  * Binary upgrade mode implies dumping sequence data even in schema-only
654  * mode. This is not exposed as a separate option, but kept separate
655  * internally for clarity.
656  */
657  if (dopt.binary_upgrade)
658  dopt.sequence_data = 1;
659 
660  if (dopt.dataOnly && dopt.schemaOnly)
661  {
662  pg_log_error("options -s/--schema-only and -a/--data-only cannot be used together");
663  exit_nicely(1);
664  }
665 
667  fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
668 
669  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
670  fatal("option --include-foreign-data is not supported with parallel backup");
671 
672  if (dopt.dataOnly && dopt.outputClean)
673  {
674  pg_log_error("options -c/--clean and -a/--data-only cannot be used together");
675  exit_nicely(1);
676  }
677 
678  if (dopt.if_exists && !dopt.outputClean)
679  fatal("option --if-exists requires option -c/--clean");
680 
681  /*
682  * --inserts are already implied above if --column-inserts or
683  * --rows-per-insert were specified.
684  */
685  if (dopt.do_nothing && dopt.dump_inserts == 0)
686  fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
687 
688  /* Identify archive format to emit */
689  archiveFormat = parseArchiveFormat(format, &archiveMode);
690 
691  /* archiveFormat specific setup */
692  if (archiveFormat == archNull)
693  plainText = 1;
694 
695  /* Custom and directory formats are compressed by default, others not */
696  if (compressLevel == -1)
697  {
698 #ifdef HAVE_LIBZ
699  if (archiveFormat == archCustom || archiveFormat == archDirectory)
700  compressLevel = Z_DEFAULT_COMPRESSION;
701  else
702 #endif
703  compressLevel = 0;
704  }
705 
706 #ifndef HAVE_LIBZ
707  if (compressLevel != 0)
708  pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
709  compressLevel = 0;
710 #endif
711 
712  /*
713  * If emitting an archive format, we always want to emit a DATABASE item,
714  * in case --create is specified at pg_restore time.
715  */
716  if (!plainText)
717  dopt.outputCreateDB = 1;
718 
719  /* Parallel backup only in the directory archive format so far */
720  if (archiveFormat != archDirectory && numWorkers > 1)
721  fatal("parallel backup only supported by the directory format");
722 
723  /* Open the output file */
724  fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
725  archiveMode, setupDumpWorker);
726 
727  /* Make dump options accessible right away */
728  SetArchiveOptions(fout, &dopt, NULL);
729 
730  /* Register the cleanup hook */
731  on_exit_close_archive(fout);
732 
733  /* Let the archiver know how noisy to be */
734  fout->verbose = g_verbose;
735 
736 
737  /*
738  * We allow the server to be back to 8.4, and up to any minor release of
739  * our own major version. (See also version check in pg_dumpall.c.)
740  */
741  fout->minRemoteVersion = 80400;
742  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
743 
744  fout->numWorkers = numWorkers;
745 
746  /*
747  * Open the database using the Archiver, so it knows about it. Errors mean
748  * death.
749  */
750  ConnectDatabase(fout, &dopt.cparams, false);
751  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
752 
753  /*
754  * Disable security label support if server version < v9.1.x (prevents
755  * access to nonexistent pg_seclabel catalog)
756  */
757  if (fout->remoteVersion < 90100)
758  dopt.no_security_labels = 1;
759 
760  /*
761  * On hot standbys, never try to dump unlogged table data, since it will
762  * just throw an error.
763  */
764  if (fout->isStandby)
765  dopt.no_unlogged_table_data = true;
766 
767  /* Select the appropriate subquery to convert user IDs to names */
768  if (fout->remoteVersion >= 80100)
769  username_subquery = "SELECT rolname FROM pg_catalog.pg_roles WHERE oid =";
770  else
771  username_subquery = "SELECT usename FROM pg_catalog.pg_user WHERE usesysid =";
772 
773  /* check the version for the synchronized snapshots feature */
774  if (numWorkers > 1 && fout->remoteVersion < 90200
775  && !dopt.no_synchronized_snapshots)
776  fatal("Synchronized snapshots are not supported by this server version.\n"
777  "Run with --no-synchronized-snapshots instead if you do not need\n"
778  "synchronized snapshots.");
779 
780  /* check the version when a snapshot is explicitly specified by user */
781  if (dumpsnapshot && fout->remoteVersion < 90200)
782  fatal("Exported snapshots are not supported by this server version.");
783 
784  /*
785  * Find the last built-in OID, if needed (prior to 8.1)
786  *
787  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
788  */
789  if (fout->remoteVersion < 80100)
791  else
793 
794  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
795 
796  /* Expand schema selection patterns into OID lists */
797  if (schema_include_patterns.head != NULL)
798  {
801  strict_names);
802  if (schema_include_oids.head == NULL)
803  fatal("no matching schemas were found");
804  }
807  false);
808  /* non-matching exclusion patterns aren't an error */
809 
810  /* Expand table selection patterns into OID lists */
811  if (table_include_patterns.head != NULL)
812  {
815  strict_names);
816  if (table_include_oids.head == NULL)
817  fatal("no matching tables were found");
818  }
821  false);
822 
825  false);
826 
829 
830  /* non-matching exclusion patterns aren't an error */
831 
832  /* Expand extension selection patterns into OID lists */
833  if (extension_include_patterns.head != NULL)
834  {
837  strict_names);
838  if (extension_include_oids.head == NULL)
839  fatal("no matching extensions were found");
840  }
841 
842  /*
843  * Dumping blobs is the default for dumps where an inclusion switch is not
844  * used (an "include everything" dump). -B can be used to exclude blobs
845  * from those dumps. -b can be used to include blobs even when an
846  * inclusion switch is used.
847  *
848  * -s means "schema only" and blobs are data, not schema, so we never
849  * include blobs when -s is used.
850  */
851  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputBlobs)
852  dopt.outputBlobs = true;
853 
854  /*
855  * Now scan the database and create DumpableObject structs for all the
856  * objects we intend to dump.
857  */
858  tblinfo = getSchemaData(fout, &numTables);
859 
860  if (fout->remoteVersion < 80400)
861  guessConstraintInheritance(tblinfo, numTables);
862 
863  if (!dopt.schemaOnly)
864  {
865  getTableData(&dopt, tblinfo, numTables, 0);
867  if (dopt.dataOnly)
869  }
870 
871  if (dopt.schemaOnly && dopt.sequence_data)
872  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
873 
874  /*
875  * In binary-upgrade mode, we do not have to worry about the actual blob
876  * data or the associated metadata that resides in the pg_largeobject and
877  * pg_largeobject_metadata tables, respectively.
878  *
879  * However, we do need to collect blob information as there may be
880  * comments or other information on blobs that we do need to dump out.
881  */
882  if (dopt.outputBlobs || dopt.binary_upgrade)
883  getBlobs(fout);
884 
885  /*
886  * Collect dependency data to assist in ordering the objects.
887  */
888  getDependencies(fout);
889 
890  /*
891  * Collect ACLs, comments, and security labels, if wanted.
892  */
893  if (!dopt.aclsSkip)
894  getAdditionalACLs(fout);
895  if (!dopt.no_comments)
896  collectComments(fout);
897  if (!dopt.no_security_labels)
898  collectSecLabels(fout);
899 
900  /* Lastly, create dummy objects to represent the section boundaries */
901  boundaryObjs = createBoundaryObjects();
902 
903  /* Get pointers to all the known DumpableObjects */
904  getDumpableObjects(&dobjs, &numObjs);
905 
906  /*
907  * Add dummy dependencies to enforce the dump section ordering.
908  */
909  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
910 
911  /*
912  * Sort the objects into a safe dump order (no forward references).
913  *
914  * We rely on dependency information to help us determine a safe order, so
915  * the initial sort is mostly for cosmetic purposes: we sort by name to
916  * ensure that logically identical schemas will dump identically.
917  */
918  sortDumpableObjectsByTypeName(dobjs, numObjs);
919 
920  sortDumpableObjects(dobjs, numObjs,
921  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
922 
923  /*
924  * Create archive TOC entries for all the objects to be dumped, in a safe
925  * order.
926  */
927 
928  /*
929  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
930  */
931  dumpEncoding(fout);
932  dumpStdStrings(fout);
933  dumpSearchPath(fout);
934 
935  /* The database items are always next, unless we don't want them at all */
936  if (dopt.outputCreateDB)
937  dumpDatabase(fout);
938 
939  /* Now the rearrangeable objects. */
940  for (i = 0; i < numObjs; i++)
941  dumpDumpableObject(fout, dobjs[i]);
942 
943  /*
944  * Set up options info to ensure we dump what we want.
945  */
946  ropt = NewRestoreOptions();
947  ropt->filename = filename;
948 
949  /* if you change this list, see dumpOptionsFromRestoreOptions */
950  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
951  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
952  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
953  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
955  ropt->dropSchema = dopt.outputClean;
956  ropt->dataOnly = dopt.dataOnly;
957  ropt->schemaOnly = dopt.schemaOnly;
958  ropt->if_exists = dopt.if_exists;
959  ropt->column_inserts = dopt.column_inserts;
960  ropt->dumpSections = dopt.dumpSections;
961  ropt->aclsSkip = dopt.aclsSkip;
962  ropt->superuser = dopt.outputSuperuser;
963  ropt->createDB = dopt.outputCreateDB;
964  ropt->noOwner = dopt.outputNoOwner;
965  ropt->noTablespace = dopt.outputNoTablespaces;
966  ropt->disable_triggers = dopt.disable_triggers;
967  ropt->use_setsessauth = dopt.use_setsessauth;
969  ropt->dump_inserts = dopt.dump_inserts;
970  ropt->no_comments = dopt.no_comments;
971  ropt->no_publications = dopt.no_publications;
973  ropt->no_subscriptions = dopt.no_subscriptions;
974  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
977  ropt->sequence_data = dopt.sequence_data;
978  ropt->binary_upgrade = dopt.binary_upgrade;
979 
980  if (compressLevel == -1)
981  ropt->compression = 0;
982  else
983  ropt->compression = compressLevel;
984 
985  ropt->suppressDumpWarnings = true; /* We've already shown them */
986 
987  SetArchiveOptions(fout, &dopt, ropt);
988 
989  /* Mark which entries should be output */
991 
992  /*
993  * The archive's TOC entries are now marked as to which ones will actually
994  * be output, so we can set up their dependency lists properly. This isn't
995  * necessary for plain-text output, though.
996  */
997  if (!plainText)
999 
1000  /*
1001  * And finally we can do the actual output.
1002  *
1003  * Note: for non-plain-text output formats, the output file is written
1004  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1005  * right now.
1006  */
1007  if (plainText)
1008  RestoreArchive(fout);
1009 
1010  CloseArchive(fout);
1011 
1012  exit_nicely(0);
1013 }
1014 
1015 
1016 static void
1017 help(const char *progname)
1018 {
1019  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1020  printf(_("Usage:\n"));
1021  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1022 
1023  printf(_("\nGeneral options:\n"));
1024  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1025  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1026  " plain text (default))\n"));
1027  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1028  printf(_(" -v, --verbose verbose mode\n"));
1029  printf(_(" -V, --version output version information, then exit\n"));
1030  printf(_(" -Z, --compress=0-9 compression level for compressed formats\n"));
1031  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1032  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1033  printf(_(" -?, --help show this help, then exit\n"));
1034 
1035  printf(_("\nOptions controlling the output content:\n"));
1036  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1037  printf(_(" -b, --blobs include large objects in dump\n"));
1038  printf(_(" -B, --no-blobs exclude large objects in dump\n"));
1039  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1040  printf(_(" -C, --create include commands to create database in dump\n"));
1041  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1042  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1043  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1044  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1045  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1046  " plain-text format\n"));
1047  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1048  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1049  printf(_(" -t, --table=PATTERN dump the specified table(s) only\n"));
1050  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1051  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1052  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1053  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1054  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1055  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1056  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1057  " access to)\n"));
1058  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1059  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1060  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1061  printf(_(" --include-foreign-data=PATTERN\n"
1062  " include data of foreign tables on foreign\n"
1063  " servers matching PATTERN\n"));
1064  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1065  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1066  printf(_(" --no-comments do not dump comments\n"));
1067  printf(_(" --no-publications do not dump publications\n"));
1068  printf(_(" --no-security-labels do not dump security label assignments\n"));
1069  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1070  printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n"));
1071  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1072  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1073  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1074  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1075  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1076  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1077  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1078  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1079  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1080  printf(_(" --strict-names require table and/or schema include patterns to\n"
1081  " match at least one entity each\n"));
1082  printf(_(" --use-set-session-authorization\n"
1083  " use SET SESSION AUTHORIZATION commands instead of\n"
1084  " ALTER OWNER commands to set ownership\n"));
1085 
1086  printf(_("\nConnection options:\n"));
1087  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1088  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1089  printf(_(" -p, --port=PORT database server port number\n"));
1090  printf(_(" -U, --username=NAME connect as specified database user\n"));
1091  printf(_(" -w, --no-password never prompt for password\n"));
1092  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1093  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1094 
1095  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1096  "variable value is used.\n\n"));
1097  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1098  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1099 }
1100 
1101 static void
1102 setup_connection(Archive *AH, const char *dumpencoding,
1103  const char *dumpsnapshot, char *use_role)
1104 {
1105  DumpOptions *dopt = AH->dopt;
1106  PGconn *conn = GetConnection(AH);
1107  const char *std_strings;
1108 
1110 
1111  /*
1112  * Set the client encoding if requested.
1113  */
1114  if (dumpencoding)
1115  {
1116  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1117  fatal("invalid client encoding \"%s\" specified",
1118  dumpencoding);
1119  }
1120 
1121  /*
1122  * Get the active encoding and the standard_conforming_strings setting, so
1123  * we know how to escape strings.
1124  */
1126 
1127  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1128  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1129 
1130  /*
1131  * Set the role if requested. In a parallel dump worker, we'll be passed
1132  * use_role == NULL, but AH->use_role is already set (if user specified it
1133  * originally) and we should use that.
1134  */
1135  if (!use_role && AH->use_role)
1136  use_role = AH->use_role;
1137 
1138  /* Set the role if requested */
1139  if (use_role && AH->remoteVersion >= 80100)
1140  {
1141  PQExpBuffer query = createPQExpBuffer();
1142 
1143  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1144  ExecuteSqlStatement(AH, query->data);
1145  destroyPQExpBuffer(query);
1146 
1147  /* save it for possible later use by parallel workers */
1148  if (!AH->use_role)
1149  AH->use_role = pg_strdup(use_role);
1150  }
1151 
1152  /* Set the datestyle to ISO to ensure the dump's portability */
1153  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1154 
1155  /* Likewise, avoid using sql_standard intervalstyle */
1156  if (AH->remoteVersion >= 80400)
1157  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1158 
1159  /*
1160  * Use an explicitly specified extra_float_digits if it has been provided.
1161  * Otherwise, set extra_float_digits so that we can dump float data
1162  * exactly (given correctly implemented float I/O code, anyway).
1163  */
1165  {
1167 
1168  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1170  ExecuteSqlStatement(AH, q->data);
1171  destroyPQExpBuffer(q);
1172  }
1173  else if (AH->remoteVersion >= 90000)
1174  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1175  else
1176  ExecuteSqlStatement(AH, "SET extra_float_digits TO 2");
1177 
1178  /*
1179  * If synchronized scanning is supported, disable it, to prevent
1180  * unpredictable changes in row ordering across a dump and reload.
1181  */
1182  if (AH->remoteVersion >= 80300)
1183  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1184 
1185  /*
1186  * Disable timeouts if supported.
1187  */
1188  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1189  if (AH->remoteVersion >= 90300)
1190  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1191  if (AH->remoteVersion >= 90600)
1192  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1193 
1194  /*
1195  * Quote all identifiers, if requested.
1196  */
1197  if (quote_all_identifiers && AH->remoteVersion >= 90100)
1198  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1199 
1200  /*
1201  * Adjust row-security mode, if supported.
1202  */
1203  if (AH->remoteVersion >= 90500)
1204  {
1205  if (dopt->enable_row_security)
1206  ExecuteSqlStatement(AH, "SET row_security = on");
1207  else
1208  ExecuteSqlStatement(AH, "SET row_security = off");
1209  }
1210 
1211  /*
1212  * Initialize prepared-query state to "nothing prepared". We do this here
1213  * so that a parallel dump worker will have its own state.
1214  */
1215  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1216 
1217  /*
1218  * Start transaction-snapshot mode transaction to dump consistent data.
1219  */
1220  ExecuteSqlStatement(AH, "BEGIN");
1221  if (AH->remoteVersion >= 90100)
1222  {
1223  /*
1224  * To support the combination of serializable_deferrable with the jobs
1225  * option we use REPEATABLE READ for the worker connections that are
1226  * passed a snapshot. As long as the snapshot is acquired in a
1227  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1228  * REPEATABLE READ transaction provides the appropriate integrity
1229  * guarantees. This is a kluge, but safe for back-patching.
1230  */
1231  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1233  "SET TRANSACTION ISOLATION LEVEL "
1234  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1235  else
1237  "SET TRANSACTION ISOLATION LEVEL "
1238  "REPEATABLE READ, READ ONLY");
1239  }
1240  else
1241  {
1243  "SET TRANSACTION ISOLATION LEVEL "
1244  "SERIALIZABLE, READ ONLY");
1245  }
1246 
1247  /*
1248  * If user specified a snapshot to use, select that. In a parallel dump
1249  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1250  * is already set (if the server can handle it) and we should use that.
1251  */
1252  if (dumpsnapshot)
1253  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1254 
1255  if (AH->sync_snapshot_id)
1256  {
1257  PQExpBuffer query = createPQExpBuffer();
1258 
1259  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1261  ExecuteSqlStatement(AH, query->data);
1262  destroyPQExpBuffer(query);
1263  }
1264  else if (AH->numWorkers > 1 &&
1265  AH->remoteVersion >= 90200 &&
1267  {
1268  if (AH->isStandby && AH->remoteVersion < 100000)
1269  fatal("Synchronized snapshots on standby servers are not supported by this server version.\n"
1270  "Run with --no-synchronized-snapshots instead if you do not need\n"
1271  "synchronized snapshots.");
1272 
1273 
1275  }
1276 }
1277 
1278 /* Set up connection for a parallel worker process */
1279 static void
1281 {
1282  /*
1283  * We want to re-select all the same values the leader connection is
1284  * using. We'll have inherited directly-usable values in
1285  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1286  * inherited encoding value back to a string to pass to setup_connection.
1287  */
1288  setup_connection(AH,
1290  NULL,
1291  NULL);
1292 }
1293 
1294 static char *
1296 {
1297  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1298  char *result;
1299  PGresult *res;
1300 
1301  res = ExecuteSqlQueryForSingleRow(fout, query);
1302  result = pg_strdup(PQgetvalue(res, 0, 0));
1303  PQclear(res);
1304 
1305  return result;
1306 }
1307 
1308 static ArchiveFormat
1310 {
1311  ArchiveFormat archiveFormat;
1312 
1313  *mode = archModeWrite;
1314 
1315  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1316  {
1317  /* This is used by pg_dumpall, and is not documented */
1318  archiveFormat = archNull;
1319  *mode = archModeAppend;
1320  }
1321  else if (pg_strcasecmp(format, "c") == 0)
1322  archiveFormat = archCustom;
1323  else if (pg_strcasecmp(format, "custom") == 0)
1324  archiveFormat = archCustom;
1325  else if (pg_strcasecmp(format, "d") == 0)
1326  archiveFormat = archDirectory;
1327  else if (pg_strcasecmp(format, "directory") == 0)
1328  archiveFormat = archDirectory;
1329  else if (pg_strcasecmp(format, "p") == 0)
1330  archiveFormat = archNull;
1331  else if (pg_strcasecmp(format, "plain") == 0)
1332  archiveFormat = archNull;
1333  else if (pg_strcasecmp(format, "t") == 0)
1334  archiveFormat = archTar;
1335  else if (pg_strcasecmp(format, "tar") == 0)
1336  archiveFormat = archTar;
1337  else
1338  fatal("invalid output format \"%s\" specified", format);
1339  return archiveFormat;
1340 }
1341 
1342 /*
1343  * Find the OIDs of all schemas matching the given list of patterns,
1344  * and append them to the given OID list.
1345  */
1346 static void
1348  SimpleStringList *patterns,
1349  SimpleOidList *oids,
1350  bool strict_names)
1351 {
1352  PQExpBuffer query;
1353  PGresult *res;
1354  SimpleStringListCell *cell;
1355  int i;
1356 
1357  if (patterns->head == NULL)
1358  return; /* nothing to do */
1359 
1360  query = createPQExpBuffer();
1361 
1362  /*
1363  * The loop below runs multiple SELECTs might sometimes result in
1364  * duplicate entries in the OID list, but we don't care.
1365  */
1366 
1367  for (cell = patterns->head; cell; cell = cell->next)
1368  {
1369  appendPQExpBufferStr(query,
1370  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1371  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1372  false, NULL, "n.nspname", NULL, NULL);
1373 
1374  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1375  if (strict_names && PQntuples(res) == 0)
1376  fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1377 
1378  for (i = 0; i < PQntuples(res); i++)
1379  {
1381  }
1382 
1383  PQclear(res);
1384  resetPQExpBuffer(query);
1385  }
1386 
1387  destroyPQExpBuffer(query);
1388 }
1389 
1390 /*
1391  * Find the OIDs of all extensions matching the given list of patterns,
1392  * and append them to the given OID list.
1393  */
1394 static void
1396  SimpleStringList *patterns,
1397  SimpleOidList *oids,
1398  bool strict_names)
1399 {
1400  PQExpBuffer query;
1401  PGresult *res;
1402  SimpleStringListCell *cell;
1403  int i;
1404 
1405  if (patterns->head == NULL)
1406  return; /* nothing to do */
1407 
1408  query = createPQExpBuffer();
1409 
1410  /*
1411  * The loop below runs multiple SELECTs might sometimes result in
1412  * duplicate entries in the OID list, but we don't care.
1413  */
1414  for (cell = patterns->head; cell; cell = cell->next)
1415  {
1416  appendPQExpBufferStr(query,
1417  "SELECT oid FROM pg_catalog.pg_extension e\n");
1418  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1419  false, NULL, "e.extname", NULL, NULL);
1420 
1421  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1422  if (strict_names && PQntuples(res) == 0)
1423  fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1424 
1425  for (i = 0; i < PQntuples(res); i++)
1426  {
1428  }
1429 
1430  PQclear(res);
1431  resetPQExpBuffer(query);
1432  }
1433 
1434  destroyPQExpBuffer(query);
1435 }
1436 
1437 /*
1438  * Find the OIDs of all foreign servers matching the given list of patterns,
1439  * and append them to the given OID list.
1440  */
1441 static void
1443  SimpleStringList *patterns,
1444  SimpleOidList *oids)
1445 {
1446  PQExpBuffer query;
1447  PGresult *res;
1448  SimpleStringListCell *cell;
1449  int i;
1450 
1451  if (patterns->head == NULL)
1452  return; /* nothing to do */
1453 
1454  query = createPQExpBuffer();
1455 
1456  /*
1457  * The loop below runs multiple SELECTs might sometimes result in
1458  * duplicate entries in the OID list, but we don't care.
1459  */
1460 
1461  for (cell = patterns->head; cell; cell = cell->next)
1462  {
1463  appendPQExpBufferStr(query,
1464  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1465  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1466  false, NULL, "s.srvname", NULL, NULL);
1467 
1468  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1469  if (PQntuples(res) == 0)
1470  fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1471 
1472  for (i = 0; i < PQntuples(res); i++)
1474 
1475  PQclear(res);
1476  resetPQExpBuffer(query);
1477  }
1478 
1479  destroyPQExpBuffer(query);
1480 }
1481 
1482 /*
1483  * Find the OIDs of all tables matching the given list of patterns,
1484  * and append them to the given OID list. See also expand_dbname_patterns()
1485  * in pg_dumpall.c
1486  */
1487 static void
1489  SimpleStringList *patterns, SimpleOidList *oids,
1490  bool strict_names)
1491 {
1492  PQExpBuffer query;
1493  PGresult *res;
1494  SimpleStringListCell *cell;
1495  int i;
1496 
1497  if (patterns->head == NULL)
1498  return; /* nothing to do */
1499 
1500  query = createPQExpBuffer();
1501 
1502  /*
1503  * this might sometimes result in duplicate entries in the OID list, but
1504  * we don't care.
1505  */
1506 
1507  for (cell = patterns->head; cell; cell = cell->next)
1508  {
1509  /*
1510  * Query must remain ABSOLUTELY devoid of unqualified names. This
1511  * would be unnecessary given a pg_table_is_visible() variant taking a
1512  * search_path argument.
1513  */
1514  appendPQExpBuffer(query,
1515  "SELECT c.oid"
1516  "\nFROM pg_catalog.pg_class c"
1517  "\n LEFT JOIN pg_catalog.pg_namespace n"
1518  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1519  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1520  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1521  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1522  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1523  RELKIND_PARTITIONED_TABLE);
1524  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1525  false, "n.nspname", "c.relname", NULL,
1526  "pg_catalog.pg_table_is_visible(c.oid)");
1527 
1528  ExecuteSqlStatement(fout, "RESET search_path");
1529  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1532  if (strict_names && PQntuples(res) == 0)
1533  fatal("no matching tables were found for pattern \"%s\"", cell->val);
1534 
1535  for (i = 0; i < PQntuples(res); i++)
1536  {
1538  }
1539 
1540  PQclear(res);
1541  resetPQExpBuffer(query);
1542  }
1543 
1544  destroyPQExpBuffer(query);
1545 }
1546 
1547 /*
1548  * checkExtensionMembership
1549  * Determine whether object is an extension member, and if so,
1550  * record an appropriate dependency and set the object's dump flag.
1551  *
1552  * It's important to call this for each object that could be an extension
1553  * member. Generally, we integrate this with determining the object's
1554  * to-be-dumped-ness, since extension membership overrides other rules for that.
1555  *
1556  * Returns true if object is an extension member, else false.
1557  */
1558 static bool
1560 {
1561  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1562 
1563  if (ext == NULL)
1564  return false;
1565 
1566  dobj->ext_member = true;
1567 
1568  /* Record dependency so that getDependencies needn't deal with that */
1569  addObjectDependency(dobj, ext->dobj.dumpId);
1570 
1571  /*
1572  * In 9.6 and above, mark the member object to have any non-initial ACL,
1573  * policies, and security labels dumped.
1574  *
1575  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1576  * extract the information about the object. We don't provide support for
1577  * initial policies and security labels and it seems unlikely for those to
1578  * ever exist, but we may have to revisit this later.
1579  *
1580  * Prior to 9.6, we do not include any extension member components.
1581  *
1582  * In binary upgrades, we still dump all components of the members
1583  * individually, since the idea is to exactly reproduce the database
1584  * contents rather than replace the extension contents with something
1585  * different.
1586  */
1587  if (fout->dopt->binary_upgrade)
1588  dobj->dump = ext->dobj.dump;
1589  else
1590  {
1591  if (fout->remoteVersion < 90600)
1592  dobj->dump = DUMP_COMPONENT_NONE;
1593  else
1594  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1597  }
1598 
1599  return true;
1600 }
1601 
1602 /*
1603  * selectDumpableNamespace: policy-setting subroutine
1604  * Mark a namespace as to be dumped or not
1605  */
1606 static void
1608 {
1609  /*
1610  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1611  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1612  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1613  */
1614  nsinfo->create = true;
1615 
1616  /*
1617  * If specific tables are being dumped, do not dump any complete
1618  * namespaces. If specific namespaces are being dumped, dump just those
1619  * namespaces. Otherwise, dump all non-system namespaces.
1620  */
1621  if (table_include_oids.head != NULL)
1622  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1623  else if (schema_include_oids.head != NULL)
1624  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1626  nsinfo->dobj.catId.oid) ?
1628  else if (fout->remoteVersion >= 90600 &&
1629  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1630  {
1631  /*
1632  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1633  * they are interesting (and not the original ACLs which were set at
1634  * initdb time, see pg_init_privs).
1635  */
1636  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1637  }
1638  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1639  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1640  {
1641  /* Other system schemas don't get dumped */
1642  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1643  }
1644  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1645  {
1646  /*
1647  * The public schema is a strange beast that sits in a sort of
1648  * no-mans-land between being a system object and a user object.
1649  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1650  * a comment and an indication of ownership. If the owner is the
1651  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1652  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1653  */
1654  nsinfo->create = false;
1655  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1656  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1657  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1659 
1660  /*
1661  * Also, make like it has a comment even if it doesn't; this is so
1662  * that we'll emit a command to drop the comment, if appropriate.
1663  * (Without this, we'd not call dumpCommentExtended for it.)
1664  */
1666  }
1667  else
1668  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1669 
1670  /*
1671  * In any case, a namespace can be excluded by an exclusion switch
1672  */
1673  if (nsinfo->dobj.dump_contains &&
1675  nsinfo->dobj.catId.oid))
1676  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1677 
1678  /*
1679  * If the schema belongs to an extension, allow extension membership to
1680  * override the dump decision for the schema itself. However, this does
1681  * not change dump_contains, so this won't change what we do with objects
1682  * within the schema. (If they belong to the extension, they'll get
1683  * suppressed by it, otherwise not.)
1684  */
1685  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1686 }
1687 
1688 /*
1689  * selectDumpableTable: policy-setting subroutine
1690  * Mark a table as to be dumped or not
1691  */
1692 static void
1694 {
1695  if (checkExtensionMembership(&tbinfo->dobj, fout))
1696  return; /* extension membership overrides all else */
1697 
1698  /*
1699  * If specific tables are being dumped, dump just those tables; else, dump
1700  * according to the parent namespace's dump flag.
1701  */
1702  if (table_include_oids.head != NULL)
1704  tbinfo->dobj.catId.oid) ?
1706  else
1707  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1708 
1709  /*
1710  * In any case, a table can be excluded by an exclusion switch
1711  */
1712  if (tbinfo->dobj.dump &&
1714  tbinfo->dobj.catId.oid))
1715  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1716 }
1717 
1718 /*
1719  * selectDumpableType: policy-setting subroutine
1720  * Mark a type as to be dumped or not
1721  *
1722  * If it's a table's rowtype or an autogenerated array type, we also apply a
1723  * special type code to facilitate sorting into the desired order. (We don't
1724  * want to consider those to be ordinary types because that would bring tables
1725  * up into the datatype part of the dump order.) We still set the object's
1726  * dump flag; that's not going to cause the dummy type to be dumped, but we
1727  * need it so that casts involving such types will be dumped correctly -- see
1728  * dumpCast. This means the flag should be set the same as for the underlying
1729  * object (the table or base type).
1730  */
1731 static void
1733 {
1734  /* skip complex types, except for standalone composite types */
1735  if (OidIsValid(tyinfo->typrelid) &&
1736  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1737  {
1738  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1739 
1740  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1741  if (tytable != NULL)
1742  tyinfo->dobj.dump = tytable->dobj.dump;
1743  else
1744  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1745  return;
1746  }
1747 
1748  /* skip auto-generated array types */
1749  if (tyinfo->isArray || tyinfo->isMultirange)
1750  {
1751  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1752 
1753  /*
1754  * Fall through to set the dump flag; we assume that the subsequent
1755  * rules will do the same thing as they would for the array's base
1756  * type. (We cannot reliably look up the base type here, since
1757  * getTypes may not have processed it yet.)
1758  */
1759  }
1760 
1761  if (checkExtensionMembership(&tyinfo->dobj, fout))
1762  return; /* extension membership overrides all else */
1763 
1764  /* Dump based on if the contents of the namespace are being dumped */
1765  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1766 }
1767 
1768 /*
1769  * selectDumpableDefaultACL: policy-setting subroutine
1770  * Mark a default ACL as to be dumped or not
1771  *
1772  * For per-schema default ACLs, dump if the schema is to be dumped.
1773  * Otherwise dump if we are dumping "everything". Note that dataOnly
1774  * and aclsSkip are checked separately.
1775  */
1776 static void
1778 {
1779  /* Default ACLs can't be extension members */
1780 
1781  if (dinfo->dobj.namespace)
1782  /* default ACLs are considered part of the namespace */
1783  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1784  else
1785  dinfo->dobj.dump = dopt->include_everything ?
1787 }
1788 
1789 /*
1790  * selectDumpableCast: policy-setting subroutine
1791  * Mark a cast as to be dumped or not
1792  *
1793  * Casts do not belong to any particular namespace (since they haven't got
1794  * names), nor do they have identifiable owners. To distinguish user-defined
1795  * casts from built-in ones, we must resort to checking whether the cast's
1796  * OID is in the range reserved for initdb.
1797  */
1798 static void
1800 {
1801  if (checkExtensionMembership(&cast->dobj, fout))
1802  return; /* extension membership overrides all else */
1803 
1804  /*
1805  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1806  * support ACLs currently.
1807  */
1808  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1809  cast->dobj.dump = DUMP_COMPONENT_NONE;
1810  else
1811  cast->dobj.dump = fout->dopt->include_everything ?
1813 }
1814 
1815 /*
1816  * selectDumpableProcLang: policy-setting subroutine
1817  * Mark a procedural language as to be dumped or not
1818  *
1819  * Procedural languages do not belong to any particular namespace. To
1820  * identify built-in languages, we must resort to checking whether the
1821  * language's OID is in the range reserved for initdb.
1822  */
1823 static void
1825 {
1826  if (checkExtensionMembership(&plang->dobj, fout))
1827  return; /* extension membership overrides all else */
1828 
1829  /*
1830  * Only include procedural languages when we are dumping everything.
1831  *
1832  * For from-initdb procedural languages, only include ACLs, as we do for
1833  * the pg_catalog namespace. We need this because procedural languages do
1834  * not live in any namespace.
1835  */
1836  if (!fout->dopt->include_everything)
1837  plang->dobj.dump = DUMP_COMPONENT_NONE;
1838  else
1839  {
1840  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1841  plang->dobj.dump = fout->remoteVersion < 90600 ?
1843  else
1844  plang->dobj.dump = DUMP_COMPONENT_ALL;
1845  }
1846 }
1847 
1848 /*
1849  * selectDumpableAccessMethod: policy-setting subroutine
1850  * Mark an access method as to be dumped or not
1851  *
1852  * Access methods do not belong to any particular namespace. To identify
1853  * built-in access methods, we must resort to checking whether the
1854  * method's OID is in the range reserved for initdb.
1855  */
1856 static void
1858 {
1859  if (checkExtensionMembership(&method->dobj, fout))
1860  return; /* extension membership overrides all else */
1861 
1862  /*
1863  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1864  * they do not support ACLs currently.
1865  */
1866  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1867  method->dobj.dump = DUMP_COMPONENT_NONE;
1868  else
1869  method->dobj.dump = fout->dopt->include_everything ?
1871 }
1872 
1873 /*
1874  * selectDumpableExtension: policy-setting subroutine
1875  * Mark an extension as to be dumped or not
1876  *
1877  * Built-in extensions should be skipped except for checking ACLs, since we
1878  * assume those will already be installed in the target database. We identify
1879  * such extensions by their having OIDs in the range reserved for initdb.
1880  * We dump all user-added extensions by default. No extensions are dumped
1881  * if include_everything is false (i.e., a --schema or --table switch was
1882  * given), except if --extension specifies a list of extensions to dump.
1883  */
1884 static void
1886 {
1887  /*
1888  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
1889  * change permissions on their member objects, if they wish to, and have
1890  * those changes preserved.
1891  */
1892  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1893  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
1894  else
1895  {
1896  /* check if there is a list of extensions to dump */
1897  if (extension_include_oids.head != NULL)
1898  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1900  extinfo->dobj.catId.oid) ?
1902  else
1903  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1904  dopt->include_everything ?
1906  }
1907 }
1908 
1909 /*
1910  * selectDumpablePublicationObject: policy-setting subroutine
1911  * Mark a publication object as to be dumped or not
1912  *
1913  * A publication can have schemas and tables which have schemas, but those are
1914  * ignored in decision making, because publications are only dumped when we are
1915  * dumping everything.
1916  */
1917 static void
1919 {
1920  if (checkExtensionMembership(dobj, fout))
1921  return; /* extension membership overrides all else */
1922 
1923  dobj->dump = fout->dopt->include_everything ?
1925 }
1926 
1927 /*
1928  * selectDumpableObject: policy-setting subroutine
1929  * Mark a generic dumpable object as to be dumped or not
1930  *
1931  * Use this only for object types without a special-case routine above.
1932  */
1933 static void
1935 {
1936  if (checkExtensionMembership(dobj, fout))
1937  return; /* extension membership overrides all else */
1938 
1939  /*
1940  * Default policy is to dump if parent namespace is dumpable, or for
1941  * non-namespace-associated items, dump if we're dumping "everything".
1942  */
1943  if (dobj->namespace)
1944  dobj->dump = dobj->namespace->dobj.dump_contains;
1945  else
1946  dobj->dump = fout->dopt->include_everything ?
1948 }
1949 
1950 /*
1951  * Dump a table's contents for loading using the COPY command
1952  * - this routine is called by the Archiver when it wants the table
1953  * to be dumped.
1954  */
1955 static int
1956 dumpTableData_copy(Archive *fout, const void *dcontext)
1957 {
1958  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
1959  TableInfo *tbinfo = tdinfo->tdtable;
1960  const char *classname = tbinfo->dobj.name;
1962 
1963  /*
1964  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
1965  * which uses it already.
1966  */
1967  PQExpBuffer clistBuf = createPQExpBuffer();
1968  PGconn *conn = GetConnection(fout);
1969  PGresult *res;
1970  int ret;
1971  char *copybuf;
1972  const char *column_list;
1973 
1974  pg_log_info("dumping contents of table \"%s.%s\"",
1975  tbinfo->dobj.namespace->dobj.name, classname);
1976 
1977  /*
1978  * Specify the column list explicitly so that we have no possibility of
1979  * retrieving data in the wrong column order. (The default column
1980  * ordering of COPY will not be what we want in certain corner cases
1981  * involving ADD COLUMN and inheritance.)
1982  */
1983  column_list = fmtCopyColumnList(tbinfo, clistBuf);
1984 
1985  /*
1986  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
1987  * a filter condition was specified. For other cases a simple COPY
1988  * suffices.
1989  */
1990  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
1991  {
1992  /* Note: this syntax is only supported in 8.2 and up */
1993  appendPQExpBufferStr(q, "COPY (SELECT ");
1994  /* klugery to get rid of parens in column list */
1995  if (strlen(column_list) > 2)
1996  {
1997  appendPQExpBufferStr(q, column_list + 1);
1998  q->data[q->len - 1] = ' ';
1999  }
2000  else
2001  appendPQExpBufferStr(q, "* ");
2002 
2003  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2004  fmtQualifiedDumpable(tbinfo),
2005  tdinfo->filtercond ? tdinfo->filtercond : "");
2006  }
2007  else
2008  {
2009  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2010  fmtQualifiedDumpable(tbinfo),
2011  column_list);
2012  }
2013  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2014  PQclear(res);
2015  destroyPQExpBuffer(clistBuf);
2016 
2017  for (;;)
2018  {
2019  ret = PQgetCopyData(conn, &copybuf, 0);
2020 
2021  if (ret < 0)
2022  break; /* done or error */
2023 
2024  if (copybuf)
2025  {
2026  WriteData(fout, copybuf, ret);
2027  PQfreemem(copybuf);
2028  }
2029 
2030  /* ----------
2031  * THROTTLE:
2032  *
2033  * There was considerable discussion in late July, 2000 regarding
2034  * slowing down pg_dump when backing up large tables. Users with both
2035  * slow & fast (multi-processor) machines experienced performance
2036  * degradation when doing a backup.
2037  *
2038  * Initial attempts based on sleeping for a number of ms for each ms
2039  * of work were deemed too complex, then a simple 'sleep in each loop'
2040  * implementation was suggested. The latter failed because the loop
2041  * was too tight. Finally, the following was implemented:
2042  *
2043  * If throttle is non-zero, then
2044  * See how long since the last sleep.
2045  * Work out how long to sleep (based on ratio).
2046  * If sleep is more than 100ms, then
2047  * sleep
2048  * reset timer
2049  * EndIf
2050  * EndIf
2051  *
2052  * where the throttle value was the number of ms to sleep per ms of
2053  * work. The calculation was done in each loop.
2054  *
2055  * Most of the hard work is done in the backend, and this solution
2056  * still did not work particularly well: on slow machines, the ratio
2057  * was 50:1, and on medium paced machines, 1:1, and on fast
2058  * multi-processor machines, it had little or no effect, for reasons
2059  * that were unclear.
2060  *
2061  * Further discussion ensued, and the proposal was dropped.
2062  *
2063  * For those people who want this feature, it can be implemented using
2064  * gettimeofday in each loop, calculating the time since last sleep,
2065  * multiplying that by the sleep ratio, then if the result is more
2066  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2067  * function to sleep for a subsecond period ie.
2068  *
2069  * select(0, NULL, NULL, NULL, &tvi);
2070  *
2071  * This will return after the interval specified in the structure tvi.
2072  * Finally, call gettimeofday again to save the 'last sleep time'.
2073  * ----------
2074  */
2075  }
2076  archprintf(fout, "\\.\n\n\n");
2077 
2078  if (ret == -2)
2079  {
2080  /* copy data transfer failed */
2081  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2082  pg_log_error("Error message from server: %s", PQerrorMessage(conn));
2083  pg_log_error("The command was: %s", q->data);
2084  exit_nicely(1);
2085  }
2086 
2087  /* Check command status and return to normal libpq state */
2088  res = PQgetResult(conn);
2090  {
2091  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2092  pg_log_error("Error message from server: %s", PQerrorMessage(conn));
2093  pg_log_error("The command was: %s", q->data);
2094  exit_nicely(1);
2095  }
2096  PQclear(res);
2097 
2098  /* Do this to ensure we've pumped libpq back to idle state */
2099  if (PQgetResult(conn) != NULL)
2100  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2101  classname);
2102 
2103  destroyPQExpBuffer(q);
2104  return 1;
2105 }
2106 
2107 /*
2108  * Dump table data using INSERT commands.
2109  *
2110  * Caution: when we restore from an archive file direct to database, the
2111  * INSERT commands emitted by this function have to be parsed by
2112  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2113  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2114  */
2115 static int
2116 dumpTableData_insert(Archive *fout, const void *dcontext)
2117 {
2118  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2119  TableInfo *tbinfo = tdinfo->tdtable;
2120  DumpOptions *dopt = fout->dopt;
2122  PQExpBuffer insertStmt = NULL;
2123  char *attgenerated;
2124  PGresult *res;
2125  int nfields,
2126  i;
2127  int rows_per_statement = dopt->dump_inserts;
2128  int rows_this_statement = 0;
2129 
2130  /*
2131  * If we're going to emit INSERTs with column names, the most efficient
2132  * way to deal with generated columns is to exclude them entirely. For
2133  * INSERTs without column names, we have to emit DEFAULT rather than the
2134  * actual column value --- but we can save a few cycles by fetching nulls
2135  * rather than the uninteresting-to-us value.
2136  */
2137  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2138  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2139  nfields = 0;
2140  for (i = 0; i < tbinfo->numatts; i++)
2141  {
2142  if (tbinfo->attisdropped[i])
2143  continue;
2144  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2145  continue;
2146  if (nfields > 0)
2147  appendPQExpBufferStr(q, ", ");
2148  if (tbinfo->attgenerated[i])
2149  appendPQExpBufferStr(q, "NULL");
2150  else
2151  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2152  attgenerated[nfields] = tbinfo->attgenerated[i];
2153  nfields++;
2154  }
2155  /* Servers before 9.4 will complain about zero-column SELECT */
2156  if (nfields == 0)
2157  appendPQExpBufferStr(q, "NULL");
2158  appendPQExpBuffer(q, " FROM ONLY %s",
2159  fmtQualifiedDumpable(tbinfo));
2160  if (tdinfo->filtercond)
2161  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2162 
2163  ExecuteSqlStatement(fout, q->data);
2164 
2165  while (1)
2166  {
2167  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2168  PGRES_TUPLES_OK);
2169 
2170  /* cross-check field count, allowing for dummy NULL if any */
2171  if (nfields != PQnfields(res) &&
2172  !(nfields == 0 && PQnfields(res) == 1))
2173  fatal("wrong number of fields retrieved from table \"%s\"",
2174  tbinfo->dobj.name);
2175 
2176  /*
2177  * First time through, we build as much of the INSERT statement as
2178  * possible in "insertStmt", which we can then just print for each
2179  * statement. If the table happens to have zero dumpable columns then
2180  * this will be a complete statement, otherwise it will end in
2181  * "VALUES" and be ready to have the row's column values printed.
2182  */
2183  if (insertStmt == NULL)
2184  {
2185  TableInfo *targettab;
2186 
2187  insertStmt = createPQExpBuffer();
2188 
2189  /*
2190  * When load-via-partition-root is set, get the root table name
2191  * for the partition table, so that we can reload data through the
2192  * root table.
2193  */
2194  if (dopt->load_via_partition_root && tbinfo->ispartition)
2195  targettab = getRootTableInfo(tbinfo);
2196  else
2197  targettab = tbinfo;
2198 
2199  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2200  fmtQualifiedDumpable(targettab));
2201 
2202  /* corner case for zero-column table */
2203  if (nfields == 0)
2204  {
2205  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2206  }
2207  else
2208  {
2209  /* append the list of column names if required */
2210  if (dopt->column_inserts)
2211  {
2212  appendPQExpBufferChar(insertStmt, '(');
2213  for (int field = 0; field < nfields; field++)
2214  {
2215  if (field > 0)
2216  appendPQExpBufferStr(insertStmt, ", ");
2217  appendPQExpBufferStr(insertStmt,
2218  fmtId(PQfname(res, field)));
2219  }
2220  appendPQExpBufferStr(insertStmt, ") ");
2221  }
2222 
2223  if (tbinfo->needs_override)
2224  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2225 
2226  appendPQExpBufferStr(insertStmt, "VALUES");
2227  }
2228  }
2229 
2230  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2231  {
2232  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2233  if (rows_this_statement == 0)
2234  archputs(insertStmt->data, fout);
2235 
2236  /*
2237  * If it is zero-column table then we've already written the
2238  * complete statement, which will mean we've disobeyed
2239  * --rows-per-insert when it's set greater than 1. We do support
2240  * a way to make this multi-row with: SELECT UNION ALL SELECT
2241  * UNION ALL ... but that's non-standard so we should avoid it
2242  * given that using INSERTs is mostly only ever needed for
2243  * cross-database exports.
2244  */
2245  if (nfields == 0)
2246  continue;
2247 
2248  /* Emit a row heading */
2249  if (rows_per_statement == 1)
2250  archputs(" (", fout);
2251  else if (rows_this_statement > 0)
2252  archputs(",\n\t(", fout);
2253  else
2254  archputs("\n\t(", fout);
2255 
2256  for (int field = 0; field < nfields; field++)
2257  {
2258  if (field > 0)
2259  archputs(", ", fout);
2260  if (attgenerated[field])
2261  {
2262  archputs("DEFAULT", fout);
2263  continue;
2264  }
2265  if (PQgetisnull(res, tuple, field))
2266  {
2267  archputs("NULL", fout);
2268  continue;
2269  }
2270 
2271  /* XXX This code is partially duplicated in ruleutils.c */
2272  switch (PQftype(res, field))
2273  {
2274  case INT2OID:
2275  case INT4OID:
2276  case INT8OID:
2277  case OIDOID:
2278  case FLOAT4OID:
2279  case FLOAT8OID:
2280  case NUMERICOID:
2281  {
2282  /*
2283  * These types are printed without quotes unless
2284  * they contain values that aren't accepted by the
2285  * scanner unquoted (e.g., 'NaN'). Note that
2286  * strtod() and friends might accept NaN, so we
2287  * can't use that to test.
2288  *
2289  * In reality we only need to defend against
2290  * infinity and NaN, so we need not get too crazy
2291  * about pattern matching here.
2292  */
2293  const char *s = PQgetvalue(res, tuple, field);
2294 
2295  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2296  archputs(s, fout);
2297  else
2298  archprintf(fout, "'%s'", s);
2299  }
2300  break;
2301 
2302  case BITOID:
2303  case VARBITOID:
2304  archprintf(fout, "B'%s'",
2305  PQgetvalue(res, tuple, field));
2306  break;
2307 
2308  case BOOLOID:
2309  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2310  archputs("true", fout);
2311  else
2312  archputs("false", fout);
2313  break;
2314 
2315  default:
2316  /* All other types are printed as string literals. */
2317  resetPQExpBuffer(q);
2319  PQgetvalue(res, tuple, field),
2320  fout);
2321  archputs(q->data, fout);
2322  break;
2323  }
2324  }
2325 
2326  /* Terminate the row ... */
2327  archputs(")", fout);
2328 
2329  /* ... and the statement, if the target no. of rows is reached */
2330  if (++rows_this_statement >= rows_per_statement)
2331  {
2332  if (dopt->do_nothing)
2333  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2334  else
2335  archputs(";\n", fout);
2336  /* Reset the row counter */
2337  rows_this_statement = 0;
2338  }
2339  }
2340 
2341  if (PQntuples(res) <= 0)
2342  {
2343  PQclear(res);
2344  break;
2345  }
2346  PQclear(res);
2347  }
2348 
2349  /* Terminate any statements that didn't make the row count. */
2350  if (rows_this_statement > 0)
2351  {
2352  if (dopt->do_nothing)
2353  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2354  else
2355  archputs(";\n", fout);
2356  }
2357 
2358  archputs("\n\n", fout);
2359 
2360  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2361 
2362  destroyPQExpBuffer(q);
2363  if (insertStmt != NULL)
2364  destroyPQExpBuffer(insertStmt);
2365  free(attgenerated);
2366 
2367  return 1;
2368 }
2369 
2370 /*
2371  * getRootTableInfo:
2372  * get the root TableInfo for the given partition table.
2373  */
2374 static TableInfo *
2376 {
2377  TableInfo *parentTbinfo;
2378 
2379  Assert(tbinfo->ispartition);
2380  Assert(tbinfo->numParents == 1);
2381 
2382  parentTbinfo = tbinfo->parents[0];
2383  while (parentTbinfo->ispartition)
2384  {
2385  Assert(parentTbinfo->numParents == 1);
2386  parentTbinfo = parentTbinfo->parents[0];
2387  }
2388 
2389  return parentTbinfo;
2390 }
2391 
2392 /*
2393  * dumpTableData -
2394  * dump the contents of a single table
2395  *
2396  * Actually, this just makes an ArchiveEntry for the table contents.
2397  */
2398 static void
2399 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2400 {
2401  DumpOptions *dopt = fout->dopt;
2402  TableInfo *tbinfo = tdinfo->tdtable;
2403  PQExpBuffer copyBuf = createPQExpBuffer();
2404  PQExpBuffer clistBuf = createPQExpBuffer();
2405  DataDumperPtr dumpFn;
2406  char *copyStmt;
2407  const char *copyFrom;
2408 
2409  /* We had better have loaded per-column details about this table */
2410  Assert(tbinfo->interesting);
2411 
2412  if (dopt->dump_inserts == 0)
2413  {
2414  /* Dump/restore using COPY */
2415  dumpFn = dumpTableData_copy;
2416 
2417  /*
2418  * When load-via-partition-root is set, get the root table name for
2419  * the partition table, so that we can reload data through the root
2420  * table.
2421  */
2422  if (dopt->load_via_partition_root && tbinfo->ispartition)
2423  {
2424  TableInfo *parentTbinfo;
2425 
2426  parentTbinfo = getRootTableInfo(tbinfo);
2427  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2428  }
2429  else
2430  copyFrom = fmtQualifiedDumpable(tbinfo);
2431 
2432  /* must use 2 steps here 'cause fmtId is nonreentrant */
2433  appendPQExpBuffer(copyBuf, "COPY %s ",
2434  copyFrom);
2435  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2436  fmtCopyColumnList(tbinfo, clistBuf));
2437  copyStmt = copyBuf->data;
2438  }
2439  else
2440  {
2441  /* Restore using INSERT */
2442  dumpFn = dumpTableData_insert;
2443  copyStmt = NULL;
2444  }
2445 
2446  /*
2447  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2448  * dependency on its table as "special" and pass it to ArchiveEntry now.
2449  * See comments for BuildArchiveDependencies.
2450  */
2451  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2452  {
2453  TocEntry *te;
2454 
2455  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2456  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2457  .namespace = tbinfo->dobj.namespace->dobj.name,
2458  .owner = tbinfo->rolname,
2459  .description = "TABLE DATA",
2460  .section = SECTION_DATA,
2461  .copyStmt = copyStmt,
2462  .deps = &(tbinfo->dobj.dumpId),
2463  .nDeps = 1,
2464  .dumpFn = dumpFn,
2465  .dumpArg = tdinfo));
2466 
2467  /*
2468  * Set the TocEntry's dataLength in case we are doing a parallel dump
2469  * and want to order dump jobs by table size. We choose to measure
2470  * dataLength in table pages (including TOAST pages) during dump, so
2471  * no scaling is needed.
2472  *
2473  * However, relpages is declared as "integer" in pg_class, and hence
2474  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2475  * Cast so that we get the right interpretation of table sizes
2476  * exceeding INT_MAX pages.
2477  */
2478  te->dataLength = (BlockNumber) tbinfo->relpages;
2479  te->dataLength += (BlockNumber) tbinfo->toastpages;
2480 
2481  /*
2482  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2483  * and instead we'd better worry about integer overflow. Clamp to
2484  * INT_MAX if the correct result exceeds that.
2485  */
2486  if (sizeof(te->dataLength) == 4 &&
2487  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2488  te->dataLength < 0))
2489  te->dataLength = INT_MAX;
2490  }
2491 
2492  destroyPQExpBuffer(copyBuf);
2493  destroyPQExpBuffer(clistBuf);
2494 }
2495 
2496 /*
2497  * refreshMatViewData -
2498  * load or refresh the contents of a single materialized view
2499  *
2500  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2501  * statement.
2502  */
2503 static void
2505 {
2506  TableInfo *tbinfo = tdinfo->tdtable;
2507  PQExpBuffer q;
2508 
2509  /* If the materialized view is not flagged as populated, skip this. */
2510  if (!tbinfo->relispopulated)
2511  return;
2512 
2513  q = createPQExpBuffer();
2514 
2515  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2516  fmtQualifiedDumpable(tbinfo));
2517 
2518  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2519  ArchiveEntry(fout,
2520  tdinfo->dobj.catId, /* catalog ID */
2521  tdinfo->dobj.dumpId, /* dump ID */
2522  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2523  .namespace = tbinfo->dobj.namespace->dobj.name,
2524  .owner = tbinfo->rolname,
2525  .description = "MATERIALIZED VIEW DATA",
2526  .section = SECTION_POST_DATA,
2527  .createStmt = q->data,
2528  .deps = tdinfo->dobj.dependencies,
2529  .nDeps = tdinfo->dobj.nDeps));
2530 
2531  destroyPQExpBuffer(q);
2532 }
2533 
2534 /*
2535  * getTableData -
2536  * set up dumpable objects representing the contents of tables
2537  */
2538 static void
2539 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2540 {
2541  int i;
2542 
2543  for (i = 0; i < numTables; i++)
2544  {
2545  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2546  (!relkind || tblinfo[i].relkind == relkind))
2547  makeTableDataInfo(dopt, &(tblinfo[i]));
2548  }
2549 }
2550 
2551 /*
2552  * Make a dumpable object for the data of this specific table
2553  *
2554  * Note: we make a TableDataInfo if and only if we are going to dump the
2555  * table data; the "dump" field in such objects isn't very interesting.
2556  */
2557 static void
2559 {
2560  TableDataInfo *tdinfo;
2561 
2562  /*
2563  * Nothing to do if we already decided to dump the table. This will
2564  * happen for "config" tables.
2565  */
2566  if (tbinfo->dataObj != NULL)
2567  return;
2568 
2569  /* Skip VIEWs (no data to dump) */
2570  if (tbinfo->relkind == RELKIND_VIEW)
2571  return;
2572  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2573  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2576  tbinfo->foreign_server)))
2577  return;
2578  /* Skip partitioned tables (data in partitions) */
2579  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2580  return;
2581 
2582  /* Don't dump data in unlogged tables, if so requested */
2583  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2584  dopt->no_unlogged_table_data)
2585  return;
2586 
2587  /* Check that the data is not explicitly excluded */
2589  tbinfo->dobj.catId.oid))
2590  return;
2591 
2592  /* OK, let's dump it */
2593  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2594 
2595  if (tbinfo->relkind == RELKIND_MATVIEW)
2596  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2597  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2598  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2599  else
2600  tdinfo->dobj.objType = DO_TABLE_DATA;
2601 
2602  /*
2603  * Note: use tableoid 0 so that this object won't be mistaken for
2604  * something that pg_depend entries apply to.
2605  */
2606  tdinfo->dobj.catId.tableoid = 0;
2607  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2608  AssignDumpId(&tdinfo->dobj);
2609  tdinfo->dobj.name = tbinfo->dobj.name;
2610  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2611  tdinfo->tdtable = tbinfo;
2612  tdinfo->filtercond = NULL; /* might get set later */
2613  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2614 
2615  /* A TableDataInfo contains data, of course */
2616  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2617 
2618  tbinfo->dataObj = tdinfo;
2619 
2620  /* Make sure that we'll collect per-column info for this table. */
2621  tbinfo->interesting = true;
2622 }
2623 
2624 /*
2625  * The refresh for a materialized view must be dependent on the refresh for
2626  * any materialized view that this one is dependent on.
2627  *
2628  * This must be called after all the objects are created, but before they are
2629  * sorted.
2630  */
2631 static void
2633 {
2634  PQExpBuffer query;
2635  PGresult *res;
2636  int ntups,
2637  i;
2638  int i_classid,
2639  i_objid,
2640  i_refobjid;
2641 
2642  /* No Mat Views before 9.3. */
2643  if (fout->remoteVersion < 90300)
2644  return;
2645 
2646  query = createPQExpBuffer();
2647 
2648  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2649  "( "
2650  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2651  "FROM pg_depend d1 "
2652  "JOIN pg_class c1 ON c1.oid = d1.objid "
2653  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2654  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2655  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2656  "AND d2.objid = r1.oid "
2657  "AND d2.refobjid <> d1.objid "
2658  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2659  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2660  CppAsString2(RELKIND_VIEW) ") "
2661  "WHERE d1.classid = 'pg_class'::regclass "
2662  "UNION "
2663  "SELECT w.objid, d3.refobjid, c3.relkind "
2664  "FROM w "
2665  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2666  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2667  "AND d3.objid = r3.oid "
2668  "AND d3.refobjid <> w.refobjid "
2669  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2670  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2671  CppAsString2(RELKIND_VIEW) ") "
2672  ") "
2673  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2674  "FROM w "
2675  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2676 
2677  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2678 
2679  ntups = PQntuples(res);
2680 
2681  i_classid = PQfnumber(res, "classid");
2682  i_objid = PQfnumber(res, "objid");
2683  i_refobjid = PQfnumber(res, "refobjid");
2684 
2685  for (i = 0; i < ntups; i++)
2686  {
2687  CatalogId objId;
2688  CatalogId refobjId;
2689  DumpableObject *dobj;
2690  DumpableObject *refdobj;
2691  TableInfo *tbinfo;
2692  TableInfo *reftbinfo;
2693 
2694  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2695  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2696  refobjId.tableoid = objId.tableoid;
2697  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2698 
2699  dobj = findObjectByCatalogId(objId);
2700  if (dobj == NULL)
2701  continue;
2702 
2703  Assert(dobj->objType == DO_TABLE);
2704  tbinfo = (TableInfo *) dobj;
2705  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2706  dobj = (DumpableObject *) tbinfo->dataObj;
2707  if (dobj == NULL)
2708  continue;
2709  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2710 
2711  refdobj = findObjectByCatalogId(refobjId);
2712  if (refdobj == NULL)
2713  continue;
2714 
2715  Assert(refdobj->objType == DO_TABLE);
2716  reftbinfo = (TableInfo *) refdobj;
2717  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2718  refdobj = (DumpableObject *) reftbinfo->dataObj;
2719  if (refdobj == NULL)
2720  continue;
2721  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2722 
2723  addObjectDependency(dobj, refdobj->dumpId);
2724 
2725  if (!reftbinfo->relispopulated)
2726  tbinfo->relispopulated = false;
2727  }
2728 
2729  PQclear(res);
2730 
2731  destroyPQExpBuffer(query);
2732 }
2733 
2734 /*
2735  * getTableDataFKConstraints -
2736  * add dump-order dependencies reflecting foreign key constraints
2737  *
2738  * This code is executed only in a data-only dump --- in schema+data dumps
2739  * we handle foreign key issues by not creating the FK constraints until
2740  * after the data is loaded. In a data-only dump, however, we want to
2741  * order the table data objects in such a way that a table's referenced
2742  * tables are restored first. (In the presence of circular references or
2743  * self-references this may be impossible; we'll detect and complain about
2744  * that during the dependency sorting step.)
2745  */
2746 static void
2748 {
2749  DumpableObject **dobjs;
2750  int numObjs;
2751  int i;
2752 
2753  /* Search through all the dumpable objects for FK constraints */
2754  getDumpableObjects(&dobjs, &numObjs);
2755  for (i = 0; i < numObjs; i++)
2756  {
2757  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2758  {
2759  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2760  TableInfo *ftable;
2761 
2762  /* Not interesting unless both tables are to be dumped */
2763  if (cinfo->contable == NULL ||
2764  cinfo->contable->dataObj == NULL)
2765  continue;
2766  ftable = findTableByOid(cinfo->confrelid);
2767  if (ftable == NULL ||
2768  ftable->dataObj == NULL)
2769  continue;
2770 
2771  /*
2772  * Okay, make referencing table's TABLE_DATA object depend on the
2773  * referenced table's TABLE_DATA object.
2774  */
2776  ftable->dataObj->dobj.dumpId);
2777  }
2778  }
2779  free(dobjs);
2780 }
2781 
2782 
2783 /*
2784  * guessConstraintInheritance:
2785  * In pre-8.4 databases, we can't tell for certain which constraints
2786  * are inherited. We assume a CHECK constraint is inherited if its name
2787  * matches the name of any constraint in the parent. Originally this code
2788  * tried to compare the expression texts, but that can fail for various
2789  * reasons --- for example, if the parent and child tables are in different
2790  * schemas, reverse-listing of function calls may produce different text
2791  * (schema-qualified or not) depending on search path.
2792  *
2793  * In 8.4 and up we can rely on the conislocal field to decide which
2794  * constraints must be dumped; much safer.
2795  *
2796  * This function assumes all conislocal flags were initialized to true.
2797  * It clears the flag on anything that seems to be inherited.
2798  */
2799 static void
2800 guessConstraintInheritance(TableInfo *tblinfo, int numTables)
2801 {
2802  int i,
2803  j,
2804  k;
2805 
2806  for (i = 0; i < numTables; i++)
2807  {
2808  TableInfo *tbinfo = &(tblinfo[i]);
2809  int numParents;
2810  TableInfo **parents;
2811  TableInfo *parent;
2812 
2813  /* Some kinds never have parents */
2814  if (tbinfo->relkind == RELKIND_SEQUENCE ||
2815  tbinfo->relkind == RELKIND_VIEW ||
2816  tbinfo->relkind == RELKIND_MATVIEW)
2817  continue;
2818 
2819  /* Don't bother computing anything for non-target tables, either */
2820  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
2821  continue;
2822 
2823  numParents = tbinfo->numParents;
2824  parents = tbinfo->parents;
2825 
2826  if (numParents == 0)
2827  continue; /* nothing to see here, move along */
2828 
2829  /* scan for inherited CHECK constraints */
2830  for (j = 0; j < tbinfo->ncheck; j++)
2831  {
2832  ConstraintInfo *constr;
2833 
2834  constr = &(tbinfo->checkexprs[j]);
2835 
2836  for (k = 0; k < numParents; k++)
2837  {
2838  int l;
2839 
2840  parent = parents[k];
2841  for (l = 0; l < parent->ncheck; l++)
2842  {
2843  ConstraintInfo *pconstr = &(parent->checkexprs[l]);
2844 
2845  if (strcmp(pconstr->dobj.name, constr->dobj.name) == 0)
2846  {
2847  constr->conislocal = false;
2848  break;
2849  }
2850  }
2851  if (!constr->conislocal)
2852  break;
2853  }
2854  }
2855  }
2856 }
2857 
2858 
2859 /*
2860  * dumpDatabase:
2861  * dump the database definition
2862  */
2863 static void
2865 {
2866  DumpOptions *dopt = fout->dopt;
2867  PQExpBuffer dbQry = createPQExpBuffer();
2868  PQExpBuffer delQry = createPQExpBuffer();
2869  PQExpBuffer creaQry = createPQExpBuffer();
2870  PQExpBuffer labelq = createPQExpBuffer();
2871  PGconn *conn = GetConnection(fout);
2872  PGresult *res;
2873  int i_tableoid,
2874  i_oid,
2875  i_datname,
2876  i_dba,
2877  i_encoding,
2878  i_collate,
2879  i_ctype,
2880  i_frozenxid,
2881  i_minmxid,
2882  i_datacl,
2883  i_acldefault,
2884  i_datistemplate,
2885  i_datconnlimit,
2886  i_tablespace;
2887  CatalogId dbCatId;
2888  DumpId dbDumpId;
2889  DumpableAcl dbdacl;
2890  const char *datname,
2891  *dba,
2892  *encoding,
2893  *collate,
2894  *ctype,
2895  *datistemplate,
2896  *datconnlimit,
2897  *tablespace;
2898  uint32 frozenxid,
2899  minmxid;
2900  char *qdatname;
2901 
2902  pg_log_info("saving database definition");
2903 
2904  /*
2905  * Fetch the database-level properties for this database.
2906  */
2907  if (fout->remoteVersion >= 90300)
2908  {
2909  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2910  "(%s datdba) AS dba, "
2911  "pg_encoding_to_char(encoding) AS encoding, "
2912  "datcollate, datctype, datfrozenxid, datminmxid, "
2913  "datacl, acldefault('d', datdba) AS acldefault, "
2914  "datistemplate, datconnlimit, "
2915  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2916  "shobj_description(oid, 'pg_database') AS description "
2917 
2918  "FROM pg_database "
2919  "WHERE datname = current_database()",
2921  }
2922  else if (fout->remoteVersion >= 90200)
2923  {
2924  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2925  "(%s datdba) AS dba, "
2926  "pg_encoding_to_char(encoding) AS encoding, "
2927  "datcollate, datctype, datfrozenxid, 0 AS datminmxid, "
2928  "datacl, acldefault('d', datdba) AS acldefault, "
2929  "datistemplate, datconnlimit, "
2930  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2931  "shobj_description(oid, 'pg_database') AS description "
2932 
2933  "FROM pg_database "
2934  "WHERE datname = current_database()",
2936  }
2937  else if (fout->remoteVersion >= 80400)
2938  {
2939  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2940  "(%s datdba) AS dba, "
2941  "pg_encoding_to_char(encoding) AS encoding, "
2942  "datcollate, datctype, datfrozenxid, 0 AS datminmxid, "
2943  "datacl, NULL AS acldefault, "
2944  "datistemplate, datconnlimit, "
2945  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2946  "shobj_description(oid, 'pg_database') AS description "
2947 
2948  "FROM pg_database "
2949  "WHERE datname = current_database()",
2951  }
2952  else if (fout->remoteVersion >= 80200)
2953  {
2954  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2955  "(%s datdba) AS dba, "
2956  "pg_encoding_to_char(encoding) AS encoding, "
2957  "NULL AS datcollate, NULL AS datctype, datfrozenxid, 0 AS datminmxid, "
2958  "datacl, NULL AS acldefault, "
2959  "datistemplate, datconnlimit, "
2960  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2961  "shobj_description(oid, 'pg_database') AS description "
2962 
2963  "FROM pg_database "
2964  "WHERE datname = current_database()",
2966  }
2967  else
2968  {
2969  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2970  "(%s datdba) AS dba, "
2971  "pg_encoding_to_char(encoding) AS encoding, "
2972  "NULL AS datcollate, NULL AS datctype, datfrozenxid, 0 AS datminmxid, "
2973  "datacl, NULL AS acldefault, "
2974  "datistemplate, -1 AS datconnlimit, "
2975  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace "
2976  "FROM pg_database "
2977  "WHERE datname = current_database()",
2979  }
2980 
2981  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
2982 
2983  i_tableoid = PQfnumber(res, "tableoid");
2984  i_oid = PQfnumber(res, "oid");
2985  i_datname = PQfnumber(res, "datname");
2986  i_dba = PQfnumber(res, "dba");
2987  i_encoding = PQfnumber(res, "encoding");
2988  i_collate = PQfnumber(res, "datcollate");
2989  i_ctype = PQfnumber(res, "datctype");
2990  i_frozenxid = PQfnumber(res, "datfrozenxid");
2991  i_minmxid = PQfnumber(res, "datminmxid");
2992  i_datacl = PQfnumber(res, "datacl");
2993  i_acldefault = PQfnumber(res, "acldefault");
2994  i_datistemplate = PQfnumber(res, "datistemplate");
2995  i_datconnlimit = PQfnumber(res, "datconnlimit");
2996  i_tablespace = PQfnumber(res, "tablespace");
2997 
2998  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
2999  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3000  datname = PQgetvalue(res, 0, i_datname);
3001  dba = PQgetvalue(res, 0, i_dba);
3002  encoding = PQgetvalue(res, 0, i_encoding);
3003  collate = PQgetvalue(res, 0, i_collate);
3004  ctype = PQgetvalue(res, 0, i_ctype);
3005  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3006  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3007  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3008  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3009  datistemplate = PQgetvalue(res, 0, i_datistemplate);
3010  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3011  tablespace = PQgetvalue(res, 0, i_tablespace);
3012 
3013  qdatname = pg_strdup(fmtId(datname));
3014 
3015  /*
3016  * Prepare the CREATE DATABASE command. We must specify encoding, locale,
3017  * and tablespace since those can't be altered later. Other DB properties
3018  * are left to the DATABASE PROPERTIES entry, so that they can be applied
3019  * after reconnecting to the target DB.
3020  */
3021  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3022  qdatname);
3023  if (strlen(encoding) > 0)
3024  {
3025  appendPQExpBufferStr(creaQry, " ENCODING = ");
3026  appendStringLiteralAH(creaQry, encoding, fout);
3027  }
3028  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3029  {
3030  appendPQExpBufferStr(creaQry, " LOCALE = ");
3031  appendStringLiteralAH(creaQry, collate, fout);
3032  }
3033  else
3034  {
3035  if (strlen(collate) > 0)
3036  {
3037  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3038  appendStringLiteralAH(creaQry, collate, fout);
3039  }
3040  if (strlen(ctype) > 0)
3041  {
3042  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3043  appendStringLiteralAH(creaQry, ctype, fout);
3044  }
3045  }
3046 
3047  /*
3048  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3049  * thing; the decision whether to specify a tablespace should be left till
3050  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3051  * label the DATABASE entry with the tablespace and let the normal
3052  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3053  * attention to default_tablespace, so that won't work.
3054  */
3055  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3056  !dopt->outputNoTablespaces)
3057  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3058  fmtId(tablespace));
3059  appendPQExpBufferStr(creaQry, ";\n");
3060 
3061  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3062  qdatname);
3063 
3064  dbDumpId = createDumpId();
3065 
3066  ArchiveEntry(fout,
3067  dbCatId, /* catalog ID */
3068  dbDumpId, /* dump ID */
3069  ARCHIVE_OPTS(.tag = datname,
3070  .owner = dba,
3071  .description = "DATABASE",
3072  .section = SECTION_PRE_DATA,
3073  .createStmt = creaQry->data,
3074  .dropStmt = delQry->data));
3075 
3076  /* Compute correct tag for archive entry */
3077  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3078 
3079  /* Dump DB comment if any */
3080  if (fout->remoteVersion >= 80200)
3081  {
3082  /*
3083  * 8.2 and up keep comments on shared objects in a shared table, so we
3084  * cannot use the dumpComment() code used for other database objects.
3085  * Be careful that the ArchiveEntry parameters match that function.
3086  */
3087  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3088 
3089  if (comment && *comment && !dopt->no_comments)
3090  {
3091  resetPQExpBuffer(dbQry);
3092 
3093  /*
3094  * Generates warning when loaded into a differently-named
3095  * database.
3096  */
3097  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3098  appendStringLiteralAH(dbQry, comment, fout);
3099  appendPQExpBufferStr(dbQry, ";\n");
3100 
3102  ARCHIVE_OPTS(.tag = labelq->data,
3103  .owner = dba,
3104  .description = "COMMENT",
3105  .section = SECTION_NONE,
3106  .createStmt = dbQry->data,
3107  .deps = &dbDumpId,
3108  .nDeps = 1));
3109  }
3110  }
3111  else
3112  {
3113  dumpComment(fout, "DATABASE", qdatname, NULL, dba,
3114  dbCatId, 0, dbDumpId);
3115  }
3116 
3117  /* Dump DB security label, if enabled */
3118  if (!dopt->no_security_labels && fout->remoteVersion >= 90200)
3119  {
3120  PGresult *shres;
3121  PQExpBuffer seclabelQry;
3122 
3123  seclabelQry = createPQExpBuffer();
3124 
3125  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3126  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3127  resetPQExpBuffer(seclabelQry);
3128  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3129  if (seclabelQry->len > 0)
3131  ARCHIVE_OPTS(.tag = labelq->data,
3132  .owner = dba,
3133  .description = "SECURITY LABEL",
3134  .section = SECTION_NONE,
3135  .createStmt = seclabelQry->data,
3136  .deps = &dbDumpId,
3137  .nDeps = 1));
3138  destroyPQExpBuffer(seclabelQry);
3139  PQclear(shres);
3140  }
3141 
3142  /*
3143  * Dump ACL if any. Note that we do not support initial privileges
3144  * (pg_init_privs) on databases.
3145  */
3146  dbdacl.privtype = 0;
3147  dbdacl.initprivs = NULL;
3148 
3149  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3150  qdatname, NULL, NULL,
3151  dba, &dbdacl);
3152 
3153  /*
3154  * Now construct a DATABASE PROPERTIES archive entry to restore any
3155  * non-default database-level properties. (The reason this must be
3156  * separate is that we cannot put any additional commands into the TOC
3157  * entry that has CREATE DATABASE. pg_restore would execute such a group
3158  * in an implicit transaction block, and the backend won't allow CREATE
3159  * DATABASE in that context.)
3160  */
3161  resetPQExpBuffer(creaQry);
3162  resetPQExpBuffer(delQry);
3163 
3164  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3165  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3166  qdatname, datconnlimit);
3167 
3168  if (strcmp(datistemplate, "t") == 0)
3169  {
3170  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3171  qdatname);
3172 
3173  /*
3174  * The backend won't accept DROP DATABASE on a template database. We
3175  * can deal with that by removing the template marking before the DROP
3176  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3177  * since no such command is currently supported, fake it with a direct
3178  * UPDATE on pg_database.
3179  */
3180  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3181  "SET datistemplate = false WHERE datname = ");
3182  appendStringLiteralAH(delQry, datname, fout);
3183  appendPQExpBufferStr(delQry, ";\n");
3184  }
3185 
3186  /* Add database-specific SET options */
3187  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3188 
3189  /*
3190  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3191  * entry, too, for lack of a better place.
3192  */
3193  if (dopt->binary_upgrade)
3194  {
3195  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3196  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3197  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3198  "WHERE datname = ",
3199  frozenxid, minmxid);
3200  appendStringLiteralAH(creaQry, datname, fout);
3201  appendPQExpBufferStr(creaQry, ";\n");
3202  }
3203 
3204  if (creaQry->len > 0)
3206  ARCHIVE_OPTS(.tag = datname,
3207  .owner = dba,
3208  .description = "DATABASE PROPERTIES",
3209  .section = SECTION_PRE_DATA,
3210  .createStmt = creaQry->data,
3211  .dropStmt = delQry->data,
3212  .deps = &dbDumpId));
3213 
3214  /*
3215  * pg_largeobject comes from the old system intact, so set its
3216  * relfrozenxids and relminmxids.
3217  */
3218  if (dopt->binary_upgrade)
3219  {
3220  PGresult *lo_res;
3221  PQExpBuffer loFrozenQry = createPQExpBuffer();
3222  PQExpBuffer loOutQry = createPQExpBuffer();
3223  int i_relfrozenxid,
3224  i_relminmxid;
3225 
3226  /*
3227  * pg_largeobject
3228  */
3229  if (fout->remoteVersion >= 90300)
3230  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid\n"
3231  "FROM pg_catalog.pg_class\n"
3232  "WHERE oid = %u;\n",
3233  LargeObjectRelationId);
3234  else
3235  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid\n"
3236  "FROM pg_catalog.pg_class\n"
3237  "WHERE oid = %u;\n",
3238  LargeObjectRelationId);
3239 
3240  lo_res = ExecuteSqlQueryForSingleRow(fout, loFrozenQry->data);
3241 
3242  i_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3243  i_relminmxid = PQfnumber(lo_res, "relminmxid");
3244 
3245  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3246  appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
3247  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3248  "WHERE oid = %u;\n",
3249  atooid(PQgetvalue(lo_res, 0, i_relfrozenxid)),
3250  atooid(PQgetvalue(lo_res, 0, i_relminmxid)),
3251  LargeObjectRelationId);
3253  ARCHIVE_OPTS(.tag = "pg_largeobject",
3254  .description = "pg_largeobject",
3255  .section = SECTION_PRE_DATA,
3256  .createStmt = loOutQry->data));
3257 
3258  PQclear(lo_res);
3259 
3260  destroyPQExpBuffer(loFrozenQry);
3261  destroyPQExpBuffer(loOutQry);
3262  }
3263 
3264  PQclear(res);
3265 
3266  free(qdatname);
3267  destroyPQExpBuffer(dbQry);
3268  destroyPQExpBuffer(delQry);
3269  destroyPQExpBuffer(creaQry);
3270  destroyPQExpBuffer(labelq);
3271 }
3272 
3273 /*
3274  * Collect any database-specific or role-and-database-specific SET options
3275  * for this database, and append them to outbuf.
3276  */
3277 static void
3279  const char *dbname, Oid dboid)
3280 {
3281  PGconn *conn = GetConnection(AH);
3283  PGresult *res;
3284  int count = 1;
3285 
3286  /*
3287  * First collect database-specific options. Pre-8.4 server versions lack
3288  * unnest(), so we do this the hard way by querying once per subscript.
3289  */
3290  for (;;)
3291  {
3292  if (AH->remoteVersion >= 90000)
3293  printfPQExpBuffer(buf, "SELECT setconfig[%d] FROM pg_db_role_setting "
3294  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3295  count, dboid);
3296  else
3297  printfPQExpBuffer(buf, "SELECT datconfig[%d] FROM pg_database WHERE oid = '%u'::oid", count, dboid);
3298 
3299  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3300 
3301  if (PQntuples(res) == 1 &&
3302  !PQgetisnull(res, 0, 0))
3303  {
3305  "DATABASE", dbname, NULL, NULL,
3306  outbuf);
3307  PQclear(res);
3308  count++;
3309  }
3310  else
3311  {
3312  PQclear(res);
3313  break;
3314  }
3315  }
3316 
3317  /* Now look for role-and-database-specific options */
3318  if (AH->remoteVersion >= 90000)
3319  {
3320  /* Here we can assume we have unnest() */
3321  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3322  "FROM pg_db_role_setting s, pg_roles r "
3323  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3324  dboid);
3325 
3326  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3327 
3328  if (PQntuples(res) > 0)
3329  {
3330  int i;
3331 
3332  for (i = 0; i < PQntuples(res); i++)
3334  "ROLE", PQgetvalue(res, i, 0),
3335  "DATABASE", dbname,
3336  outbuf);
3337  }
3338 
3339  PQclear(res);
3340  }
3341 
3343 }
3344 
3345 /*
3346  * dumpEncoding: put the correct encoding into the archive
3347  */
3348 static void
3350 {
3351  const char *encname = pg_encoding_to_char(AH->encoding);
3353 
3354  pg_log_info("saving encoding = %s", encname);
3355 
3356  appendPQExpBufferStr(qry, "SET client_encoding = ");
3357  appendStringLiteralAH(qry, encname, AH);
3358  appendPQExpBufferStr(qry, ";\n");
3359 
3361  ARCHIVE_OPTS(.tag = "ENCODING",
3362  .description = "ENCODING",
3363  .section = SECTION_PRE_DATA,
3364  .createStmt = qry->data));
3365 
3366  destroyPQExpBuffer(qry);
3367 }
3368 
3369 
3370 /*
3371  * dumpStdStrings: put the correct escape string behavior into the archive
3372  */
3373 static void
3375 {
3376  const char *stdstrings = AH->std_strings ? "on" : "off";
3378 
3379  pg_log_info("saving standard_conforming_strings = %s",
3380  stdstrings);
3381 
3382  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3383  stdstrings);
3384 
3386  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3387  .description = "STDSTRINGS",
3388  .section = SECTION_PRE_DATA,
3389  .createStmt = qry->data));
3390 
3391  destroyPQExpBuffer(qry);
3392 }
3393 
3394 /*
3395  * dumpSearchPath: record the active search_path in the archive
3396  */
3397 static void
3399 {
3401  PQExpBuffer path = createPQExpBuffer();
3402  PGresult *res;
3403  char **schemanames = NULL;
3404  int nschemanames = 0;
3405  int i;
3406 
3407  /*
3408  * We use the result of current_schemas(), not the search_path GUC,
3409  * because that might contain wildcards such as "$user", which won't
3410  * necessarily have the same value during restore. Also, this way avoids
3411  * listing schemas that may appear in search_path but not actually exist,
3412  * which seems like a prudent exclusion.
3413  */
3415  "SELECT pg_catalog.current_schemas(false)");
3416 
3417  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3418  fatal("could not parse result of current_schemas()");
3419 
3420  /*
3421  * We use set_config(), not a simple "SET search_path" command, because
3422  * the latter has less-clean behavior if the search path is empty. While
3423  * that's likely to get fixed at some point, it seems like a good idea to
3424  * be as backwards-compatible as possible in what we put into archives.
3425  */
3426  for (i = 0; i < nschemanames; i++)
3427  {
3428  if (i > 0)
3429  appendPQExpBufferStr(path, ", ");
3430  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3431  }
3432 
3433  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3434  appendStringLiteralAH(qry, path->data, AH);
3435  appendPQExpBufferStr(qry, ", false);\n");
3436 
3437  pg_log_info("saving search_path = %s", path->data);
3438 
3440  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3441  .description = "SEARCHPATH",
3442  .section = SECTION_PRE_DATA,
3443  .createStmt = qry->data));
3444 
3445  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3446  AH->searchpath = pg_strdup(qry->data);
3447 
3448  if (schemanames)
3449  free(schemanames);
3450  PQclear(res);
3451  destroyPQExpBuffer(qry);
3452  destroyPQExpBuffer(path);
3453 }
3454 
3455 
3456 /*
3457  * getBlobs:
3458  * Collect schema-level data about large objects
3459  */
3460 static void
3462 {
3463  DumpOptions *dopt = fout->dopt;
3464  PQExpBuffer blobQry = createPQExpBuffer();
3465  BlobInfo *binfo;
3466  DumpableObject *bdata;
3467  PGresult *res;
3468  int ntups;
3469  int i;
3470  int i_oid;
3471  int i_lomowner;
3472  int i_lomacl;
3473  int i_acldefault;
3474 
3475  pg_log_info("reading large objects");
3476 
3477  /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
3478  if (fout->remoteVersion >= 90200)
3479  {
3480  appendPQExpBuffer(blobQry,
3481  "SELECT oid, (%s lomowner) AS rolname, lomacl, "
3482  "acldefault('L', lomowner) AS acldefault "
3483  "FROM pg_largeobject_metadata",
3485  }
3486  else if (fout->remoteVersion >= 90000)
3487  appendPQExpBuffer(blobQry,
3488  "SELECT oid, (%s lomowner) AS rolname, lomacl, "
3489  "NULL AS acldefault "
3490  "FROM pg_largeobject_metadata",
3492  else
3493  appendPQExpBufferStr(blobQry,
3494  "SELECT DISTINCT loid AS oid, "
3495  "NULL::name AS rolname, NULL::oid AS lomacl, "
3496  "NULL::oid AS acldefault "
3497  " FROM pg_largeobject");
3498 
3499  res = ExecuteSqlQuery(fout, blobQry->data, PGRES_TUPLES_OK);
3500 
3501  i_oid = PQfnumber(res, "oid");
3502  i_lomowner = PQfnumber(res, "rolname");
3503  i_lomacl = PQfnumber(res, "lomacl");
3504  i_acldefault = PQfnumber(res, "acldefault");
3505 
3506  ntups = PQntuples(res);
3507 
3508  /*
3509  * Each large object has its own BLOB archive entry.
3510  */
3511  binfo = (BlobInfo *) pg_malloc(ntups * sizeof(BlobInfo));
3512 
3513  for (i = 0; i < ntups; i++)
3514  {
3515  binfo[i].dobj.objType = DO_BLOB;
3516  binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3517  binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3518  AssignDumpId(&binfo[i].dobj);
3519 
3520  binfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3521  binfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3522  binfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3523  binfo[i].dacl.privtype = 0;
3524  binfo[i].dacl.initprivs = NULL;
3525  binfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_lomowner));
3526 
3527  /* Blobs have data */
3529 
3530  /* Mark whether blob has an ACL */
3531  if (!PQgetisnull(res, i, i_lomacl))
3532  binfo[i].dobj.components |= DUMP_COMPONENT_ACL;
3533 
3534  /*
3535  * In binary-upgrade mode for blobs, we do *not* dump out the blob
3536  * data, as it will be copied by pg_upgrade, which simply copies the
3537  * pg_largeobject table. We *do* however dump out anything but the
3538  * data, as pg_upgrade copies just pg_largeobject, but not
3539  * pg_largeobject_metadata, after the dump is restored.
3540  */
3541  if (dopt->binary_upgrade)
3542  binfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3543  }
3544 
3545  /*
3546  * If we have any large objects, a "BLOBS" archive entry is needed. This
3547  * is just a placeholder for sorting; it carries no data now.
3548  */
3549  if (ntups > 0)
3550  {
3551  bdata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3552  bdata->objType = DO_BLOB_DATA;
3553  bdata->catId = nilCatalogId;
3554  AssignDumpId(bdata);
3555  bdata->name = pg_strdup("BLOBS");
3556  bdata->components |= DUMP_COMPONENT_DATA;
3557  }
3558 
3559  PQclear(res);
3560  destroyPQExpBuffer(blobQry);
3561 }
3562 
3563 /*
3564  * dumpBlob
3565  *
3566  * dump the definition (metadata) of the given large object
3567  */
3568 static void
3569 dumpBlob(Archive *fout, const BlobInfo *binfo)
3570 {
3571  PQExpBuffer cquery = createPQExpBuffer();
3572  PQExpBuffer dquery = createPQExpBuffer();
3573 
3574  appendPQExpBuffer(cquery,
3575  "SELECT pg_catalog.lo_create('%s');\n",
3576  binfo->dobj.name);
3577 
3578  appendPQExpBuffer(dquery,
3579  "SELECT pg_catalog.lo_unlink('%s');\n",
3580  binfo->dobj.name);
3581 
3582  if (binfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3583  ArchiveEntry(fout, binfo->dobj.catId, binfo->dobj.dumpId,
3584  ARCHIVE_OPTS(.tag = binfo->dobj.name,
3585  .owner = binfo->rolname,
3586  .description = "BLOB",
3587  .section = SECTION_PRE_DATA,
3588  .createStmt = cquery->data,
3589  .dropStmt = dquery->data));
3590 
3591  /* Dump comment if any */
3592  if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3593  dumpComment(fout, "LARGE OBJECT", binfo->dobj.name,
3594  NULL, binfo->rolname,
3595  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3596 
3597  /* Dump security label if any */
3598  if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3599  dumpSecLabel(fout, "LARGE OBJECT", binfo->dobj.name,
3600  NULL, binfo->rolname,
3601  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3602 
3603  /* Dump ACL if any */
3604  if (binfo->dobj.dump & DUMP_COMPONENT_ACL)
3605  dumpACL(fout, binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3606  binfo->dobj.name, NULL,
3607  NULL, binfo->rolname, &binfo->dacl);
3608 
3609  destroyPQExpBuffer(cquery);
3610  destroyPQExpBuffer(dquery);
3611 }
3612 
3613 /*
3614  * dumpBlobs:
3615  * dump the data contents of all large objects
3616  */
3617 static int
3618 dumpBlobs(Archive *fout, const void *arg)
3619 {
3620  const char *blobQry;
3621  const char *blobFetchQry;
3622  PGconn *conn = GetConnection(fout);
3623  PGresult *res;
3624  char buf[LOBBUFSIZE];
3625  int ntups;
3626  int i;
3627  int cnt;
3628 
3629  pg_log_info("saving large objects");
3630 
3631  /*
3632  * Currently, we re-fetch all BLOB OIDs using a cursor. Consider scanning
3633  * the already-in-memory dumpable objects instead...
3634  */
3635  if (fout->remoteVersion >= 90000)
3636  blobQry =
3637  "DECLARE bloboid CURSOR FOR "
3638  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3639  else
3640  blobQry =
3641  "DECLARE bloboid CURSOR FOR "
3642  "SELECT DISTINCT loid FROM pg_largeobject ORDER BY 1";
3643 
3644  ExecuteSqlStatement(fout, blobQry);
3645 
3646  /* Command to fetch from cursor */
3647  blobFetchQry = "FETCH 1000 IN bloboid";
3648 
3649  do
3650  {
3651  /* Do a fetch */
3652  res = ExecuteSqlQuery(fout, blobFetchQry, PGRES_TUPLES_OK);
3653 
3654  /* Process the tuples, if any */
3655  ntups = PQntuples(res);
3656  for (i = 0; i < ntups; i++)
3657  {
3658  Oid blobOid;
3659  int loFd;
3660 
3661  blobOid = atooid(PQgetvalue(res, i, 0));
3662  /* Open the BLOB */
3663  loFd = lo_open(conn, blobOid, INV_READ);
3664  if (loFd == -1)
3665  fatal("could not open large object %u: %s",
3666  blobOid, PQerrorMessage(conn));
3667 
3668  StartBlob(fout, blobOid);
3669 
3670  /* Now read it in chunks, sending data to archive */
3671  do
3672  {
3673  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3674  if (cnt < 0)
3675  fatal("error reading large object %u: %s",
3676  blobOid, PQerrorMessage(conn));
3677 
3678  WriteData(fout, buf, cnt);
3679  } while (cnt > 0);
3680 
3681  lo_close(conn, loFd);
3682 
3683  EndBlob(fout, blobOid);
3684  }
3685 
3686  PQclear(res);
3687  } while (ntups > 0);
3688 
3689  return 1;
3690 }
3691 
3692 /*
3693  * getPolicies
3694  * get information about all RLS policies on dumpable tables.
3695  */
3696 void
3697 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3698 {
3699  PQExpBuffer query;
3700  PGresult *res;
3701  PolicyInfo *polinfo;
3702  int i_oid;
3703  int i_tableoid;
3704  int i_polrelid;
3705  int i_polname;
3706  int i_polcmd;
3707  int i_polpermissive;
3708  int i_polroles;
3709  int i_polqual;
3710  int i_polwithcheck;
3711  int i,
3712  j,
3713  ntups;
3714 
3715  if (fout->remoteVersion < 90500)
3716  return;
3717 
3718  query = createPQExpBuffer();
3719 
3720  /*
3721  * First, check which tables have RLS enabled. We represent RLS being
3722  * enabled on a table by creating a PolicyInfo object with null polname.
3723  */
3724  for (i = 0; i < numTables; i++)
3725  {
3726  TableInfo *tbinfo = &tblinfo[i];
3727 
3728  /* Ignore row security on tables not to be dumped */
3729  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3730  continue;
3731 
3732  if (tbinfo->rowsec)
3733  {
3735 
3736  /*
3737  * Note: use tableoid 0 so that this object won't be mistaken for
3738  * something that pg_depend entries apply to.
3739  */
3740  polinfo = pg_malloc(sizeof(PolicyInfo));
3741  polinfo->dobj.objType = DO_POLICY;
3742  polinfo->dobj.catId.tableoid = 0;
3743  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3744  AssignDumpId(&polinfo->dobj);
3745  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3746  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3747  polinfo->poltable = tbinfo;
3748  polinfo->polname = NULL;
3749  polinfo->polcmd = '\0';
3750  polinfo->polpermissive = 0;
3751  polinfo->polroles = NULL;
3752  polinfo->polqual = NULL;
3753  polinfo->polwithcheck = NULL;
3754  }
3755  }
3756 
3757  /*
3758  * Now, read all RLS policies, and create PolicyInfo objects for all those
3759  * that are of interest.
3760  */
3761  pg_log_info("reading row-level security policies");
3762 
3763  printfPQExpBuffer(query,
3764  "SELECT oid, tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3765  if (fout->remoteVersion >= 100000)
3766  appendPQExpBuffer(query, "pol.polpermissive, ");
3767  else
3768  appendPQExpBuffer(query, "'t' as polpermissive, ");
3769  appendPQExpBuffer(query,
3770  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3771  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3772  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3773  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3774  "FROM pg_catalog.pg_policy pol");
3775 
3776  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3777 
3778  ntups = PQntuples(res);
3779  if (ntups > 0)
3780  {
3781  i_oid = PQfnumber(res, "oid");
3782  i_tableoid = PQfnumber(res, "tableoid");
3783  i_polrelid = PQfnumber(res, "polrelid");
3784  i_polname = PQfnumber(res, "polname");
3785  i_polcmd = PQfnumber(res, "polcmd");
3786  i_polpermissive = PQfnumber(res, "polpermissive");
3787  i_polroles = PQfnumber(res, "polroles");
3788  i_polqual = PQfnumber(res, "polqual");
3789  i_polwithcheck = PQfnumber(res, "polwithcheck");
3790 
3791  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3792 
3793  for (j = 0; j < ntups; j++)
3794  {
3795  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3796  TableInfo *tbinfo = findTableByOid(polrelid);
3797 
3798  /*
3799  * Ignore row security on tables not to be dumped. (This will
3800  * result in some harmless wasted slots in polinfo[].)
3801  */
3802  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3803  continue;
3804 
3806 
3807  polinfo[j].dobj.objType = DO_POLICY;
3808  polinfo[j].dobj.catId.tableoid =
3809  atooid(PQgetvalue(res, j, i_tableoid));
3810  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3811  AssignDumpId(&polinfo[j].dobj);
3812  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3813  polinfo[j].poltable = tbinfo;
3814  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3815  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3816 
3817  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3818  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3819 
3820  if (PQgetisnull(res, j, i_polroles))
3821  polinfo[j].polroles = NULL;
3822  else
3823  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3824 
3825  if (PQgetisnull(res, j, i_polqual))
3826  polinfo[j].polqual = NULL;
3827  else
3828  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3829 
3830  if (PQgetisnull(res, j, i_polwithcheck))
3831  polinfo[j].polwithcheck = NULL;
3832  else
3833  polinfo[j].polwithcheck
3834  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3835  }
3836  }
3837 
3838  PQclear(res);
3839 
3840  destroyPQExpBuffer(query);
3841 }
3842 
3843 /*
3844  * dumpPolicy
3845  * dump the definition of the given policy
3846  */
3847 static void
3848 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3849 {
3850  DumpOptions *dopt = fout->dopt;
3851  TableInfo *tbinfo = polinfo->poltable;
3852  PQExpBuffer query;
3853  PQExpBuffer delqry;
3854  PQExpBuffer polprefix;
3855  char *qtabname;
3856  const char *cmd;
3857  char *tag;
3858 
3859  /* Do nothing in data-only dump */
3860  if (dopt->dataOnly)
3861  return;
3862 
3863  /*
3864  * If polname is NULL, then this record is just indicating that ROW LEVEL
3865  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3866  * ROW LEVEL SECURITY.
3867  */
3868  if (polinfo->polname == NULL)
3869  {
3870  query = createPQExpBuffer();
3871 
3872  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3873  fmtQualifiedDumpable(tbinfo));
3874 
3875  /*
3876  * We must emit the ROW SECURITY object's dependency on its table
3877  * explicitly, because it will not match anything in pg_depend (unlike
3878  * the case for other PolicyInfo objects).
3879  */
3880  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3881  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3882  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3883  .namespace = polinfo->dobj.namespace->dobj.name,
3884  .owner = tbinfo->rolname,
3885  .description = "ROW SECURITY",
3886  .section = SECTION_POST_DATA,
3887  .createStmt = query->data,
3888  .deps = &(tbinfo->dobj.dumpId),
3889  .nDeps = 1));
3890 
3891  destroyPQExpBuffer(query);
3892  return;
3893  }
3894 
3895  if (polinfo->polcmd == '*')
3896  cmd = "";
3897  else if (polinfo->polcmd == 'r')
3898  cmd = " FOR SELECT";
3899  else if (polinfo->polcmd == 'a')
3900  cmd = " FOR INSERT";
3901  else if (polinfo->polcmd == 'w')
3902  cmd = " FOR UPDATE";
3903  else if (polinfo->polcmd == 'd')
3904  cmd = " FOR DELETE";
3905  else
3906  {
3907  pg_log_error("unexpected policy command type: %c",
3908  polinfo->polcmd);
3909  exit_nicely(1);
3910  }
3911 
3912  query = createPQExpBuffer();
3913  delqry = createPQExpBuffer();
3914  polprefix = createPQExpBuffer();
3915 
3916  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3917 
3918  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
3919 
3920  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
3921  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
3922 
3923  if (polinfo->polroles != NULL)
3924  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
3925 
3926  if (polinfo->polqual != NULL)
3927  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
3928 
3929  if (polinfo->polwithcheck != NULL)
3930  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
3931 
3932  appendPQExpBufferStr(query, ";\n");
3933 
3934  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
3935  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
3936 
3937  appendPQExpBuffer(polprefix, "POLICY %s ON",
3938  fmtId(polinfo->polname));
3939 
3940  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
3941 
3942  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3943  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3944  ARCHIVE_OPTS(.tag = tag,
3945  .namespace = polinfo->dobj.namespace->dobj.name,
3946  .owner = tbinfo->rolname,
3947  .description = "POLICY",
3948  .section = SECTION_POST_DATA,
3949  .createStmt = query->data,
3950  .dropStmt = delqry->data));
3951 
3952  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3953  dumpComment(fout, polprefix->data, qtabname,
3954  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
3955  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
3956 
3957  free(tag);
3958  destroyPQExpBuffer(query);
3959  destroyPQExpBuffer(delqry);
3960  destroyPQExpBuffer(polprefix);
3961  free(qtabname);
3962 }
3963 
3964 /*
3965  * getPublications
3966  * get information about publications
3967  */
3969 getPublications(Archive *fout, int *numPublications)
3970 {
3971  DumpOptions *dopt = fout->dopt;
3972  PQExpBuffer query;
3973  PGresult *res;
3974  PublicationInfo *pubinfo;
3975  int i_tableoid;
3976  int i_oid;
3977  int i_pubname;
3978  int i_rolname;
3979  int i_puballtables;
3980  int i_pubinsert;
3981  int i_pubupdate;
3982  int i_pubdelete;
3983  int i_pubtruncate;
3984  int i_pubviaroot;
3985  int i,
3986  ntups;
3987 
3988  if (dopt->no_publications || fout->remoteVersion < 100000)
3989  {
3990  *numPublications = 0;
3991  return NULL;
3992  }
3993 
3994  query = createPQExpBuffer();
3995 
3996  resetPQExpBuffer(query);
3997 
3998  /* Get the publications. */
3999  if (fout->remoteVersion >= 130000)
4000  appendPQExpBuffer(query,
4001  "SELECT p.tableoid, p.oid, p.pubname, "
4002  "(%s p.pubowner) AS rolname, "
4003  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
4004  "FROM pg_publication p",
4006  else if (fout->remoteVersion >= 110000)
4007  appendPQExpBuffer(query,
4008  "SELECT p.tableoid, p.oid, p.pubname, "
4009  "(%s p.pubowner) AS rolname, "
4010  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
4011  "FROM pg_publication p",
4013  else
4014  appendPQExpBuffer(query,
4015  "SELECT p.tableoid, p.oid, p.pubname, "
4016  "(%s p.pubowner) AS rolname, "
4017  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
4018  "FROM pg_publication p",
4020 
4021  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4022 
4023  ntups = PQntuples(res);
4024 
4025  i_tableoid = PQfnumber(res, "tableoid");
4026  i_oid = PQfnumber(res, "oid");
4027  i_pubname = PQfnumber(res, "pubname");
4028  i_rolname = PQfnumber(res, "rolname");
4029  i_puballtables = PQfnumber(res, "puballtables");
4030  i_pubinsert = PQfnumber(res, "pubinsert");
4031  i_pubupdate = PQfnumber(res, "pubupdate");
4032  i_pubdelete = PQfnumber(res, "pubdelete");
4033  i_pubtruncate = PQfnumber(res, "pubtruncate");
4034  i_pubviaroot = PQfnumber(res, "pubviaroot");
4035 
4036  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4037 
4038  for (i = 0; i < ntups; i++)
4039  {
4040  pubinfo[i].dobj.objType = DO_PUBLICATION;
4041  pubinfo[i].dobj.catId.tableoid =
4042  atooid(PQgetvalue(res, i, i_tableoid));
4043  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4044  AssignDumpId(&pubinfo[i].dobj);
4045  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4046  pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
4047  pubinfo[i].puballtables =
4048  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4049  pubinfo[i].pubinsert =
4050  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4051  pubinfo[i].pubupdate =
4052  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4053  pubinfo[i].pubdelete =
4054  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4055  pubinfo[i].pubtruncate =
4056  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4057  pubinfo[i].pubviaroot =
4058  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4059 
4060  if (strlen(pubinfo[i].rolname) == 0)
4061  pg_log_warning("owner of publication \"%s\" appears to be invalid",
4062  pubinfo[i].dobj.name);
4063 
4064  /* Decide whether we want to dump it */
4065  selectDumpableObject(&(pubinfo[i].dobj), fout);
4066  }
4067  PQclear(res);
4068 
4069  destroyPQExpBuffer(query);
4070 
4071  *numPublications = ntups;
4072  return pubinfo;
4073 }
4074 
4075 /*
4076  * dumpPublication
4077  * dump the definition of the given publication
4078  */
4079 static void
4081 {
4082  PQExpBuffer delq;
4083  PQExpBuffer query;
4084  char *qpubname;
4085  bool first = true;
4086 
4087  delq = createPQExpBuffer();
4088  query = createPQExpBuffer();
4089 
4090  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4091 
4092  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4093  qpubname);
4094 
4095  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4096  qpubname);
4097 
4098  if (pubinfo->puballtables)
4099  appendPQExpBufferStr(query, " FOR ALL TABLES");
4100 
4101  appendPQExpBufferStr(query, " WITH (publish = '");
4102  if (pubinfo->pubinsert)
4103  {
4104  appendPQExpBufferStr(query, "insert");
4105  first = false;
4106  }
4107 
4108  if (pubinfo->pubupdate)
4109  {
4110  if (!first)
4111  appendPQExpBufferStr(query, ", ");
4112 
4113  appendPQExpBufferStr(query, "update");
4114  first = false;
4115  }
4116 
4117  if (pubinfo->pubdelete)
4118  {
4119  if (!first)
4120  appendPQExpBufferStr(query, ", ");
4121 
4122  appendPQExpBufferStr(query, "delete");
4123  first = false;
4124  }
4125 
4126  if (pubinfo->pubtruncate)
4127  {
4128  if (!first)
4129  appendPQExpBufferStr(query, ", ");
4130 
4131  appendPQExpBufferStr(query, "truncate");
4132  first = false;
4133  }
4134 
4135  appendPQExpBufferStr(query, "'");
4136 
4137  if (pubinfo->pubviaroot)
4138  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4139 
4140  appendPQExpBufferStr(query, ");\n");
4141 
4142  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4143  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4144  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4145  .owner = pubinfo->rolname,
4146  .description = "PUBLICATION",
4147  .section = SECTION_POST_DATA,
4148  .createStmt = query->data,
4149  .dropStmt = delq->data));
4150 
4151  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4152  dumpComment(fout, "PUBLICATION", qpubname,
4153  NULL, pubinfo->rolname,
4154  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4155 
4156  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4157  dumpSecLabel(fout, "PUBLICATION", qpubname,
4158  NULL, pubinfo->rolname,
4159  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4160 
4161  destroyPQExpBuffer(delq);
4162  destroyPQExpBuffer(query);
4163  free(qpubname);
4164 }
4165 
4166 /*
4167  * getPublicationNamespaces
4168  * get information about publication membership for dumpable schemas.
4169  */
4170 void
4172 {
4173  PQExpBuffer query;
4174  PGresult *res;
4175  PublicationSchemaInfo *pubsinfo;
4176  DumpOptions *dopt = fout->dopt;
4177  int i_tableoid;
4178  int i_oid;
4179  int i_pnpubid;
4180  int i_pnnspid;
4181  int i,
4182  j,
4183  ntups;
4184 
4185  if (dopt->no_publications || fout->remoteVersion < 150000)
4186  return;
4187 
4188  query = createPQExpBuffer();
4189 
4190  /* Collect all publication membership info. */
4191  appendPQExpBufferStr(query,
4192  "SELECT tableoid, oid, pnpubid, pnnspid "
4193  "FROM pg_catalog.pg_publication_namespace");
4194  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4195 
4196  ntups = PQntuples(res);
4197 
4198  i_tableoid = PQfnumber(res, "tableoid");
4199  i_oid = PQfnumber(res, "oid");
4200  i_pnpubid = PQfnumber(res, "pnpubid");
4201  i_pnnspid = PQfnumber(res, "pnnspid");
4202 
4203  /* this allocation may be more than we need */
4204  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4205  j = 0;
4206 
4207  for (i = 0; i < ntups; i++)
4208  {
4209  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4210  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4211  PublicationInfo *pubinfo;
4212  NamespaceInfo *nspinfo;
4213 
4214  /*
4215  * Ignore any entries for which we aren't interested in either the
4216  * publication or the rel.
4217  */
4218  pubinfo = findPublicationByOid(pnpubid);
4219  if (pubinfo == NULL)
4220  continue;
4221  nspinfo = findNamespaceByOid(pnnspid);
4222  if (nspinfo == NULL)
4223  continue;
4224 
4225  /*
4226  * We always dump publication namespaces unless the corresponding
4227  * namespace is excluded from the dump.
4228  */
4229  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4230  continue;
4231 
4232  /* OK, make a DumpableObject for this relationship */
4234  pubsinfo[j].dobj.catId.tableoid =
4235  atooid(PQgetvalue(res, i, i_tableoid));
4236  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4237  AssignDumpId(&pubsinfo[j].dobj);
4238  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4239  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4240  pubsinfo[j].publication = pubinfo;
4241  pubsinfo[j].pubschema = nspinfo;
4242 
4243  /* Decide whether we want to dump it */
4244  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4245 
4246  j++;
4247  }
4248 
4249  PQclear(res);
4250  destroyPQExpBuffer(query);
4251 }
4252 
4253 /*
4254  * getPublicationTables
4255  * get information about publication membership for dumpable tables.
4256  */
4257 void
4258 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4259 {
4260  PQExpBuffer query;
4261  PGresult *res;
4262  PublicationRelInfo *pubrinfo;
4263  DumpOptions *dopt = fout->dopt;
4264  int i_tableoid;
4265  int i_oid;
4266  int i_prpubid;
4267  int i_prrelid;
4268  int i,
4269  j,
4270  ntups;
4271 
4272  if (dopt->no_publications || fout->remoteVersion < 100000)
4273  return;
4274 
4275  query = createPQExpBuffer();
4276 
4277  /* Collect all publication membership info. */
4278  appendPQExpBufferStr(query,
4279  "SELECT tableoid, oid, prpubid, prrelid "
4280  "FROM pg_catalog.pg_publication_rel");
4281  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4282 
4283  ntups = PQntuples(res);
4284 
4285  i_tableoid = PQfnumber(res, "tableoid");
4286  i_oid = PQfnumber(res, "oid");
4287  i_prpubid = PQfnumber(res, "prpubid");
4288  i_prrelid = PQfnumber(res, "prrelid");
4289 
4290  /* this allocation may be more than we need */
4291  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4292  j = 0;
4293 
4294  for (i = 0; i < ntups; i++)
4295  {
4296  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4297  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4298  PublicationInfo *pubinfo;
4299  TableInfo *tbinfo;
4300 
4301  /*
4302  * Ignore any entries for which we aren't interested in either the
4303  * publication or the rel.
4304  */
4305  pubinfo = findPublicationByOid(prpubid);
4306  if (pubinfo == NULL)
4307  continue;
4308  tbinfo = findTableByOid(prrelid);
4309  if (tbinfo == NULL)
4310  continue;
4311 
4312  /*
4313  * Ignore publication membership of tables whose definitions are not
4314  * to be dumped.
4315  */
4316  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4317  continue;
4318 
4319  /* OK, make a DumpableObject for this relationship */
4320  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4321  pubrinfo[j].dobj.catId.tableoid =
4322  atooid(PQgetvalue(res, i, i_tableoid));
4323  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4324  AssignDumpId(&pubrinfo[j].dobj);
4325  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4326  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4327  pubrinfo[j].publication = pubinfo;
4328  pubrinfo[j].pubtable = tbinfo;
4329 
4330  /* Decide whether we want to dump it */
4331  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4332 
4333  j++;
4334  }
4335 
4336  PQclear(res);
4337  destroyPQExpBuffer(query);
4338 }
4339 
4340 /*
4341  * dumpPublicationNamespace
4342  * dump the definition of the given publication schema mapping.
4343  */
4344 static void
4346 {
4347  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4348  PublicationInfo *pubinfo = pubsinfo->publication;
4349  PQExpBuffer query;
4350  char *tag;
4351 
4352  if (!(pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4353  return;
4354 
4355  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4356 
4357  query = createPQExpBuffer();
4358 
4359  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4360  appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4361 
4362  /*
4363  * There is no point in creating drop query as the drop is done by schema
4364  * drop.
4365  */
4366  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4367  ARCHIVE_OPTS(.tag = tag,
4368  .namespace = schemainfo->dobj.name,
4369  .owner = pubinfo->rolname,
4370  .description = "PUBLICATION TABLES IN SCHEMA",
4371  .section = SECTION_POST_DATA,
4372  .createStmt = query->data));
4373 
4374  free(tag);
4375  destroyPQExpBuffer(query);
4376 }
4377 
4378 /*
4379  * dumpPublicationTable
4380  * dump the definition of the given publication table mapping
4381  */
4382 static void
4384 {
4385  PublicationInfo *pubinfo = pubrinfo->publication;
4386  TableInfo *tbinfo = pubrinfo->pubtable;
4387  PQExpBuffer query;
4388  char *tag;
4389 
4390  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4391 
4392  query = createPQExpBuffer();
4393 
4394  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4395  fmtId(pubinfo->dobj.name));
4396  appendPQExpBuffer(query, " %s;\n",
4397  fmtQualifiedDumpable(tbinfo));
4398 
4399  /*
4400  * There is no point in creating a drop query as the drop is done by table
4401  * drop. (If you think to change this, see also _printTocEntry().)
4402  * Although this object doesn't really have ownership as such, set the
4403  * owner field anyway to ensure that the command is run by the correct
4404  * role at restore time.
4405  */
4406  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4407  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4408  ARCHIVE_OPTS(.tag = tag,
4409  .namespace = tbinfo->dobj.namespace->dobj.name,
4410  .owner = pubinfo->rolname,
4411  .description = "PUBLICATION TABLE",
4412  .section = SECTION_POST_DATA,
4413  .createStmt = query->data));
4414 
4415  free(tag);
4416  destroyPQExpBuffer(query);
4417 }
4418 
4419 /*
4420  * Is the currently connected user a superuser?
4421  */
4422 static bool
4424 {
4425  ArchiveHandle *AH = (ArchiveHandle *) fout;
4426  const char *val;
4427 
4428  val = PQparameterStatus(AH->connection, "is_superuser");
4429 
4430  if (val && strcmp(val, "on") == 0)
4431  return true;
4432 
4433  return false;
4434 }
4435 
4436 /*
4437  * getSubscriptions
4438  * get information about subscriptions
4439  */
4440 void
4442 {
4443  DumpOptions *dopt = fout->dopt;
4444  PQExpBuffer query;
4445  PGresult *res;
4446  SubscriptionInfo *subinfo;
4447  int i_tableoid;
4448  int i_oid;
4449  int i_subname;
4450  int i_rolname;
4451  int i_substream;
4452  int i_subtwophasestate;
4453  int i_subconninfo;
4454  int i_subslotname;
4455  int i_subsynccommit;
4456  int i_subpublications;
4457  int i_subbinary;
4458  int i,
4459  ntups;
4460 
4461  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4462  return;
4463 
4464  if (!is_superuser(fout))
4465  {
4466  int n;
4467 
4468  res = ExecuteSqlQuery(fout,
4469  "SELECT count(*) FROM pg_subscription "
4470  "WHERE subdbid = (SELECT oid FROM pg_database"
4471  " WHERE datname = current_database())",
4472  PGRES_TUPLES_OK);
4473  n = atoi(PQgetvalue(res, 0, 0));
4474  if (n > 0)
4475  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4476  PQclear(res);
4477  return;
4478  }
4479 
4480  query = createPQExpBuffer();
4481 
4482  /* Get the subscriptions in current database. */
4483  appendPQExpBuffer(query,
4484  "SELECT s.tableoid, s.oid, s.subname,\n"
4485  " (%s s.subowner) AS rolname,\n"
4486  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4487  " s.subpublications,\n",
4489 
4490  if (fout->remoteVersion >= 140000)
4491  appendPQExpBufferStr(query, " s.subbinary,\n");
4492  else
4493  appendPQExpBufferStr(query, " false AS subbinary,\n");
4494 
4495  if (fout->remoteVersion >= 140000)
4496  appendPQExpBufferStr(query, " s.substream,\n");
4497  else
4498  appendPQExpBufferStr(query, " false AS substream,\n");
4499 
4500  if (fout->remoteVersion >= 150000)
4501  appendPQExpBufferStr(query, " s.subtwophasestate\n");
4502  else
4503  appendPQExpBuffer(query,
4504  " '%c' AS subtwophasestate\n",
4506 
4507  appendPQExpBufferStr(query,
4508  "FROM pg_subscription s\n"
4509  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4510  " WHERE datname = current_database())");
4511 
4512  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4513 
4514  ntups = PQntuples(res);
4515 
4516  i_tableoid = PQfnumber(res, "tableoid");
4517  i_oid = PQfnumber(res, "oid");
4518  i_subname = PQfnumber(res, "subname");
4519  i_rolname = PQfnumber(res, "rolname");
4520  i_subconninfo = PQfnumber(res, "subconninfo");
4521  i_subslotname = PQfnumber(res, "subslotname");
4522  i_subsynccommit = PQfnumber(res, "subsynccommit");
4523  i_subpublications = PQfnumber(res, "subpublications");
4524  i_subbinary = PQfnumber(res, "subbinary");
4525  i_substream = PQfnumber(res, "substream");
4526  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4527 
4528  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4529 
4530  for (i = 0; i < ntups; i++)
4531  {
4532  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4533  subinfo[i].dobj.catId.tableoid =
4534  atooid(PQgetvalue(res, i, i_tableoid));
4535  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4536  AssignDumpId(&subinfo[i].dobj);
4537  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4538  subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
4539  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4540  if (PQgetisnull(res, i, i_subslotname))
4541  subinfo[i].subslotname = NULL;
4542  else
4543  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4544  subinfo[i].subsynccommit =
4545  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4546  subinfo[i].subpublications =
4547  pg_strdup(PQgetvalue(res, i, i_subpublications));
4548  subinfo[i].subbinary =
4549  pg_strdup(PQgetvalue(res, i, i_subbinary));
4550  subinfo[i].substream =
4551  pg_strdup(PQgetvalue(res, i, i_substream));
4552  subinfo[i].subtwophasestate =
4553  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4554 
4555  if (strlen(subinfo[i].rolname) == 0)
4556  pg_log_warning("owner of subscription \"%s\" appears to be invalid",
4557  subinfo[i].dobj.name);
4558 
4559  /* Decide whether we want to dump it */
4560  selectDumpableObject(&(subinfo[i].dobj), fout);
4561  }
4562  PQclear(res);
4563 
4564  destroyPQExpBuffer(query);
4565 }
4566 
4567 /*
4568  * dumpSubscription
4569  * dump the definition of the given subscription
4570  */
4571 static void
4573 {
4574  PQExpBuffer delq;
4575  PQExpBuffer query;
4576  PQExpBuffer publications;
4577  char *qsubname;
4578  char **pubnames = NULL;
4579  int npubnames = 0;
4580  int i;
4581  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4582 
4583  delq = createPQExpBuffer();
4584  query = createPQExpBuffer();
4585 
4586  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4587 
4588  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4589  qsubname);
4590 
4591  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4592  qsubname);
4593  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4594 
4595  /* Build list of quoted publications and append them to query. */
4596  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4597  fatal("could not parse %s array", "subpublications");
4598 
4599  publications = createPQExpBuffer();
4600  for (i = 0; i < npubnames; i++)
4601  {
4602  if (i > 0)
4603  appendPQExpBufferStr(publications, ", ");
4604 
4605  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4606  }
4607 
4608  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4609  if (subinfo->subslotname)
4610  appendStringLiteralAH(query, subinfo->subslotname, fout);
4611  else
4612  appendPQExpBufferStr(query, "NONE");
4613 
4614  if (strcmp(subinfo->subbinary, "t") == 0)
4615  appendPQExpBufferStr(query, ", binary = true");
4616 
4617  if (strcmp(subinfo->substream, "f") != 0)
4618  appendPQExpBufferStr(query, ", streaming = on");
4619 
4620  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4621  appendPQExpBufferStr(query, ", two_phase = on");
4622 
4623  if (strcmp(subinfo->subsynccommit, "off") != 0)
4624  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4625 
4626  appendPQExpBufferStr(query, ");\n");
4627 
4628  if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4629  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4630  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4631  .owner = subinfo->rolname,
4632  .description = "SUBSCRIPTION",
4633  .section = SECTION_POST_DATA,
4634  .createStmt = query->data,
4635  .dropStmt = delq->data));
4636 
4637  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4638  dumpComment(fout, "SUBSCRIPTION", qsubname,
4639  NULL, subinfo->rolname,
4640  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4641 
4642  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4643  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4644  NULL, subinfo->rolname,
4645  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4646 
4647  destroyPQExpBuffer(publications);
4648  if (pubnames)
4649  free(pubnames);
4650 
4651  destroyPQExpBuffer(delq);
4652  destroyPQExpBuffer(query);
4653  free(qsubname);
4654 }
4655 
4656 /*
4657  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4658  * the object needs.
4659  */
4660 static void
4662  PQExpBuffer create,
4663  const DumpableObject *dobj,
4664  const char *catalog,
4665  const char *keyword,
4666  const char *objname)
4667 {
4668  if (dobj->depends_on_ext)
4669  {
4670  char *nm;
4671  PGresult *res;
4672  PQExpBuffer query;
4673  int ntups;
4674  int i_extname;
4675  int i;
4676 
4677  /* dodge fmtId() non-reentrancy */
4678  nm = pg_strdup(objname);
4679 
4680  query = createPQExpBuffer();
4681  appendPQExpBuffer(query,
4682  "SELECT e.extname "
4683  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4684  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4685  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4686  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4687  catalog,
4688  dobj->catId.oid);
4689  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4690  ntups = PQntuples(res);
4691  i_extname = PQfnumber(res, "extname");
4692  for (i = 0; i < ntups; i++)
4693  {
4694  appendPQExpBuffer(create, "ALTER %s %s DEPENDS ON EXTENSION %s;\n",
4695  keyword, nm,
4696  fmtId(PQgetvalue(res, i, i_extname)));
4697  }
4698 
4699  PQclear(res);
4700  destroyPQExpBuffer(query);
4701  pg_free(nm);
4702  }
4703 }
4704 
4705 static Oid
4707 {
4708  /*
4709  * If the old version didn't assign an array type, but the new version
4710  * does, we must select an unused type OID to assign. This currently only
4711  * happens for domains, when upgrading pre-v11 to v11 and up.
4712  *
4713  * Note: local state here is kind of ugly, but we must have some, since we
4714  * mustn't choose the same unused OID more than once.
4715  */
4716  static Oid next_possible_free_oid = FirstNormalObjectId;
4717  PGresult *res;
4718  bool is_dup;
4719 
4720  do
4721  {
4722  ++next_possible_free_oid;
4723  printfPQExpBuffer(upgrade_query,
4724  "SELECT EXISTS(SELECT 1 "
4725  "FROM pg_catalog.pg_type "
4726  "WHERE oid = '%u'::pg_catalog.oid);",
4727  next_possible_free_oid);
4728  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4729  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4730  PQclear(res);
4731  } while (is_dup);
4732 
4733  return next_possible_free_oid;
4734 }
4735 
4736 static void
4738  PQExpBuffer upgrade_buffer,
4739  Oid pg_type_oid,
4740  bool force_array_type,
4741  bool include_multirange_type)
4742 {
4743  PQExpBuffer upgrade_query = createPQExpBuffer();
4744  PGresult *res;
4745  Oid pg_type_array_oid;
4746  Oid pg_type_multirange_oid;
4747  Oid pg_type_multirange_array_oid;
4748 
4749  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4750  appendPQExpBuffer(upgrade_buffer,
4751  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4752  pg_type_oid);
4753 
4754  /* we only support old >= 8.3 for binary upgrades */
4755  appendPQExpBuffer(upgrade_query,
4756  "SELECT typarray "
4757  "FROM pg_catalog.pg_type "
4758  "WHERE oid = '%u'::pg_catalog.oid;",
4759  pg_type_oid);
4760 
4761  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4762 
4763  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4764 
4765  PQclear(res);
4766 
4767  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4768  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4769 
4770  if (OidIsValid(pg_type_array_oid))
4771  {
4772  appendPQExpBufferStr(upgrade_buffer,
4773  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4774  appendPQExpBuffer(upgrade_buffer,
4775  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4776  pg_type_array_oid);
4777  }
4778 
4779  /*
4780  * Pre-set the multirange type oid and its own array type oid.
4781  */
4782  if (include_multirange_type)
4783  {
4784  if (fout->remoteVersion >= 140000)
4785  {
4786  appendPQExpBuffer(upgrade_query,
4787  "SELECT t.oid, t.typarray "
4788  "FROM pg_catalog.pg_type t "
4789  "JOIN pg_catalog.pg_range r "
4790  "ON t.oid = r.rngmultitypid "
4791  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4792  pg_type_oid);
4793 
4794  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4795 
4796  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4797  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4798 
4799  PQclear(res);
4800  }
4801  else
4802  {
4803  pg_type_multirange_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4804  pg_type_multirange_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4805  }
4806 
4807  appendPQExpBufferStr(upgrade_buffer,
4808  "\n-- For binary upgrade, must preserve multirange pg_type oid\n");
4809  appendPQExpBuffer(upgrade_buffer,
4810  "SELECT pg_catalog.binary_upgrade_set_next_multirange_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4811  pg_type_multirange_oid);
4812  appendPQExpBufferStr(upgrade_buffer,
4813  "\n-- For binary upgrade, must preserve multirange pg_type array oid\n");
4814  appendPQExpBuffer(upgrade_buffer,
4815  "SELECT pg_catalog.binary_upgrade_set_next_multirange_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4816  pg_type_multirange_array_oid);
4817  }
4818 
4819  destroyPQExpBuffer(upgrade_query);
4820 }
4821 
4822 static void
4824  PQExpBuffer upgrade_buffer,
4825  const TableInfo *tbinfo)
4826 {
4827  Oid pg_type_oid = tbinfo->reltype;
4828 
4829  if (OidIsValid(pg_type_oid))
4830  binary_upgrade_set_type_oids_by_type_oid(fout, upgrade_buffer,
4831  pg_type_oid, false, false);
4832 }
4833 
4834 static void
4836  PQExpBuffer upgrade_buffer, Oid pg_class_oid,
4837  bool is_index)
4838 {
4839  appendPQExpBufferStr(upgrade_buffer,
4840  "\n-- For binary upgrade, must preserve pg_class oids\n");
4841 
4842  if (!is_index)
4843  {
4844  PQExpBuffer upgrade_query = createPQExpBuffer();
4845  PGresult *upgrade_res;
4846  Oid pg_class_reltoastrelid;
4847  char pg_class_relkind;
4848  Oid pg_index_indexrelid;
4849 
4850  appendPQExpBuffer(upgrade_buffer,
4851  "SELECT pg_catalog.binary_upgrade_set_next_heap_pg_class_oid('%u'::pg_catalog.oid);\n",
4852  pg_class_oid);
4853 
4854  /*
4855  * Preserve the OIDs of the table's toast table and index, if any.
4856  * Indexes cannot have toast tables, so we need not make this probe in
4857  * the index code path.
4858  *
4859  * One complexity is that the current table definition might not
4860  * require the creation of a TOAST table, but the old database might
4861  * have a TOAST table that was created earlier, before some wide
4862  * columns were dropped. By setting the TOAST oid we force creation
4863  * of the TOAST heap and index by the new backend, so we can copy the
4864  * files during binary upgrade without worrying about this case.
4865  */
4866  appendPQExpBuffer(upgrade_query,
4867  "SELECT c.reltoastrelid, c.relkind, i.indexrelid "
4868  "FROM pg_catalog.pg_class c LEFT JOIN "
4869  "pg_catalog.pg_index i ON (c.reltoastrelid = i.indrelid AND i.indisvalid) "
4870  "WHERE c.oid = '%u'::pg_catalog.oid;",
4871  pg_class_oid);
4872 
4873  upgrade_res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4874 
4875  pg_class_reltoastrelid = atooid(PQgetvalue(upgrade_res, 0,
4876  PQfnumber(upgrade_res, "reltoastrelid")));
4877  pg_class_relkind = *PQgetvalue(upgrade_res, 0,
4878  PQfnumber(upgrade_res, "relkind"));
4879  pg_index_indexrelid = atooid(PQgetvalue(upgrade_res, 0,
4880  PQfnumber(upgrade_res, "indexrelid")));
4881 
4882  /*
4883  * In a pre-v12 database, partitioned tables might be marked as having
4884  * toast tables, but we should ignore them if so.
4885  */
4886  if (OidIsValid(pg_class_reltoastrelid) &&
4887  pg_class_relkind != RELKIND_PARTITIONED_TABLE)
4888  {
4889  appendPQExpBuffer(upgrade_buffer,
4890  "SELECT pg_catalog.binary_upgrade_set_next_toast_pg_class_oid('%u'::pg_catalog.oid);\n",
4891  pg_class_reltoastrelid);
4892 
4893  /* every toast table has an index */
4894  appendPQExpBuffer(upgrade_buffer,
4895  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4896  pg_index_indexrelid);
4897  }
4898 
4899  PQclear(upgrade_res);
4900  destroyPQExpBuffer(upgrade_query);
4901  }
4902  else
4903  appendPQExpBuffer(upgrade_buffer,
4904  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4905  pg_class_oid);
4906 
4907  appendPQExpBufferChar(upgrade_buffer, '\n');
4908 }
4909 
4910 /*
4911  * If the DumpableObject is a member of an extension, add a suitable
4912  * ALTER EXTENSION ADD command to the creation commands in upgrade_buffer.
4913  *
4914  * For somewhat historical reasons, objname should already be quoted,
4915  * but not objnamespace (if any).
4916  */
4917 static void
4919  const DumpableObject *dobj,
4920  const char *objtype,
4921  const char *objname,
4922  const char *objnamespace)
4923 {
4924  DumpableObject *extobj = NULL;
4925  int i;
4926 
4927  if (!dobj->ext_member)
4928  return;
4929 
4930  /*
4931  * Find the parent extension. We could avoid this search if we wanted to
4932  * add a link field to DumpableObject, but the space costs of that would
4933  * be considerable. We assume that member objects could only have a
4934  * direct dependency on their own extension, not any others.
4935  */
4936  for (i = 0; i < dobj->nDeps; i++)
4937  {
4938  extobj = findObjectByDumpId(dobj->dependencies[i]);
4939  if (extobj && extobj->objType == DO_EXTENSION)
4940  break;
4941  extobj = NULL;
4942  }
4943  if (extobj == NULL)
4944  fatal("could not find parent extension for %s %s",
4945  objtype, objname);
4946 
4947  appendPQExpBufferStr(upgrade_buffer,
4948  "\n-- For binary upgrade, handle extension membership the hard way\n");
4949  appendPQExpBuffer(upgrade_buffer, "ALTER EXTENSION %s ADD %s ",
4950  fmtId(extobj->name),
4951  objtype);
4952  if (objnamespace && *objnamespace)
4953  appendPQExpBuffer(upgrade_buffer, "%s.", fmtId(objnamespace));
4954  appendPQExpBuffer(upgrade_buffer, "%s;\n", objname);
4955 }
4956 
4957 /*
4958  * getNamespaces:
4959  * read all namespaces in the system catalogs and return them in the
4960  * NamespaceInfo* structure
4961  *
4962  * numNamespaces is set to the number of namespaces read in
4963  */
4964 NamespaceInfo *
4965 getNamespaces(Archive *fout, int *numNamespaces)
4966 {
4967  PGresult *res;
4968  int ntups;
4969  int i;
4970  PQExpBuffer query;
4971  NamespaceInfo *nsinfo;
4972  int i_tableoid;
4973  int i_oid;
4974  int i_nspname;
4975  int i_nspowner;
4976  int i_rolname;
4977  int i_nspacl;
4978  int i_acldefault;
4979 
4980  query = createPQExpBuffer();
4981 
4982  /*
4983  * we fetch all namespaces including system ones, so that every object we
4984  * read in can be linked to a containing namespace.
4985  */
4986  if (fout->remoteVersion >= 90200)
4987  appendPQExpBuffer(query, "SELECT n.tableoid, n.oid, n.nspname, "
4988  "n.nspowner, "
4989  "(%s nspowner) AS rolname, "
4990  "n.nspacl, "
4991  "acldefault('n', n.nspowner) AS acldefault "
4992  "FROM pg_namespace n",
4994  else
4995  appendPQExpBuffer(query, "SELECT tableoid, oid, nspname, nspowner, "
4996  "(%s nspowner) AS rolname, "
4997  "nspacl, NULL AS acldefault "
4998  "FROM pg_namespace",
5000 
5001  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
5002 
5003  ntups = PQntuples(res);
5004 
5005  nsinfo = (NamespaceInfo *) pg_malloc(ntups * sizeof(NamespaceInfo));
5006 
5007  i_tableoid = PQfnumber(res, "tableoid");
5008  i_oid = PQfnumber(res, "oid");
5009  i_nspname = PQfnumber(res, "nspname");
5010  i_nspowner = PQfnumber(res, "nspowner");
5011  i_rolname = PQfnumber(res, "rolname");
5012  i_nspacl = PQfnumber(res, "nspacl");
5013  i_acldefault = PQfnumber(res, "acldefault");
5014 
5015  for (i = 0; i < ntups; i++)
5016  {
5017  nsinfo[i].dobj.objType = DO_NAMESPACE;
5018  nsinfo[i].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid));
5019  nsinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
5020  AssignDumpId(&nsinfo[i].dobj);
5021  nsinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_nspname));
5022  nsinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_nspacl));
5023  nsinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
5024  nsinfo[i].dacl.privtype = 0;
5025  nsinfo[i].dacl.