PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "dumputils.h"
59 #include "fe_utils/option_utils.h"
60 #include "fe_utils/string_utils.h"
61 #include "getopt_long.h"
62 #include "libpq/libpq-fs.h"
63 #include "parallel.h"
64 #include "pg_backup_db.h"
65 #include "pg_backup_utils.h"
66 #include "pg_dump.h"
67 #include "storage/block.h"
68 
69 typedef struct
70 {
71  Oid roleoid; /* role's OID */
72  const char *rolename; /* role's name */
73 } RoleNameItem;
74 
75 typedef struct
76 {
77  const char *descr; /* comment for an object */
78  Oid classoid; /* object class (catalog OID) */
79  Oid objoid; /* object OID */
80  int objsubid; /* subobject (table column #) */
81 } CommentItem;
82 
83 typedef struct
84 {
85  const char *provider; /* label provider of this security label */
86  const char *label; /* security label for an object */
87  Oid classoid; /* object class (catalog OID) */
88  Oid objoid; /* object OID */
89  int objsubid; /* subobject (table column #) */
90 } SecLabelItem;
91 
92 typedef enum OidOptions
93 {
96  zeroAsNone = 4
98 
99 /* global decls */
100 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
101 
102 static Oid g_last_builtin_oid; /* value of the last builtin oid */
103 
104 /* The specified names/patterns should to match at least one entity */
105 static int strict_names = 0;
106 
107 /*
108  * Object inclusion/exclusion lists
109  *
110  * The string lists record the patterns given by command-line switches,
111  * which we then convert to lists of OIDs of matching objects.
112  */
114 static SimpleOidList schema_include_oids = {NULL, NULL};
116 static SimpleOidList schema_exclude_oids = {NULL, NULL};
117 
119 static SimpleOidList table_include_oids = {NULL, NULL};
121 static SimpleOidList table_exclude_oids = {NULL, NULL};
123 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
126 
128 static SimpleOidList extension_include_oids = {NULL, NULL};
129 
130 static const CatalogId nilCatalogId = {0, 0};
131 
132 /* override for standard extra_float_digits setting */
133 static bool have_extra_float_digits = false;
135 
136 /* sorted table of role names */
137 static RoleNameItem *rolenames = NULL;
138 static int nrolenames = 0;
139 
140 /* sorted table of comments */
141 static CommentItem *comments = NULL;
142 static int ncomments = 0;
143 
144 /* sorted table of security labels */
145 static SecLabelItem *seclabels = NULL;
146 static int nseclabels = 0;
147 
148 /*
149  * The default number of rows per INSERT when
150  * --inserts is specified without --rows-per-insert
151  */
152 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
153 
154 /*
155  * Macro for producing quoted, schema-qualified name of a dumpable object.
156  */
157 #define fmtQualifiedDumpable(obj) \
158  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
159  (obj)->dobj.name)
160 
161 static void help(const char *progname);
162 static void setup_connection(Archive *AH,
163  const char *dumpencoding, const char *dumpsnapshot,
164  char *use_role);
166 static void expand_schema_name_patterns(Archive *fout,
167  SimpleStringList *patterns,
168  SimpleOidList *oids,
169  bool strict_names);
170 static void expand_extension_name_patterns(Archive *fout,
171  SimpleStringList *patterns,
172  SimpleOidList *oids,
173  bool strict_names);
175  SimpleStringList *patterns,
176  SimpleOidList *oids);
177 static void expand_table_name_patterns(Archive *fout,
178  SimpleStringList *patterns,
179  SimpleOidList *oids,
180  bool strict_names);
181 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
182  const char *pattern);
183 
184 static NamespaceInfo *findNamespace(Oid nsoid);
185 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
186 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
187 static const char *getRoleName(const char *roleoid_str);
188 static void collectRoleNames(Archive *fout);
189 static void getAdditionalACLs(Archive *fout);
190 static void dumpCommentExtended(Archive *fout, const char *type,
191  const char *name, const char *namespace,
192  const char *owner, CatalogId catalogId,
193  int subid, DumpId dumpId,
194  const char *initdb_comment);
195 static inline void dumpComment(Archive *fout, const char *type,
196  const char *name, const char *namespace,
197  const char *owner, CatalogId catalogId,
198  int subid, DumpId dumpId);
199 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
200 static void collectComments(Archive *fout);
201 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
202  const char *namespace, const char *owner,
203  CatalogId catalogId, int subid, DumpId dumpId);
204 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
205 static void collectSecLabels(Archive *fout);
206 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
207 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
208 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
209 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
210 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
211 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
212 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
213 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
214 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
215 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
216 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
217  PGresult *res);
218 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
219 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
220 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
221 static void dumpCast(Archive *fout, const CastInfo *cast);
222 static void dumpTransform(Archive *fout, const TransformInfo *transform);
223 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
224 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *oprinfo);
225 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
226 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
227 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
228 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
229 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
230 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
231 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
232 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
233 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
234 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
235 static void dumpTableAttach(Archive *fout, const TableAttachInfo *tbinfo);
236 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
237 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
238 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
239 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
240 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
241 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
242 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
243 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
244 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
245 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
246 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
247 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
248 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
249 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
250 static void dumpUserMappings(Archive *fout,
251  const char *servername, const char *namespace,
252  const char *owner, CatalogId catalogId, DumpId dumpId);
253 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
254 
255 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
256  const char *type, const char *name, const char *subname,
257  const char *nspname, const char *owner,
258  const DumpableAcl *dacl);
259 
260 static void getDependencies(Archive *fout);
261 static void BuildArchiveDependencies(Archive *fout);
262 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
263  DumpId **dependencies, int *nDeps, int *allocDeps);
264 
266 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
267  DumpableObject *boundaryObjs);
268 
269 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
270 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
271 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
272 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
273 static void buildMatViewRefreshDependencies(Archive *fout);
274 static void getTableDataFKConstraints(void);
275 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
276  bool is_agg);
277 static char *format_function_signature(Archive *fout,
278  const FuncInfo *finfo, bool honor_quotes);
279 static char *convertRegProcReference(const char *proc);
280 static char *getFormattedOperatorName(const char *oproid);
281 static char *convertTSFunction(Archive *fout, Oid funcOid);
282 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
283 static void getBlobs(Archive *fout);
284 static void dumpBlob(Archive *fout, const BlobInfo *binfo);
285 static int dumpBlobs(Archive *fout, const void *arg);
286 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
287 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
288 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
289 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
290 static void dumpDatabase(Archive *AH);
291 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
292  const char *dbname, Oid dboid);
293 static void dumpEncoding(Archive *AH);
294 static void dumpStdStrings(Archive *AH);
295 static void dumpSearchPath(Archive *AH);
297  PQExpBuffer upgrade_buffer,
298  Oid pg_type_oid,
299  bool force_array_type,
300  bool include_multirange_type);
302  PQExpBuffer upgrade_buffer,
303  const TableInfo *tbinfo);
304 static void binary_upgrade_set_pg_class_oids(Archive *fout,
305  PQExpBuffer upgrade_buffer,
306  Oid pg_class_oid, bool is_index);
307 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
308  const DumpableObject *dobj,
309  const char *objtype,
310  const char *objname,
311  const char *objnamespace);
312 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
313 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
314 static bool nonemptyReloptions(const char *reloptions);
315 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
316  const char *prefix, Archive *fout);
317 static char *get_synchronized_snapshot(Archive *fout);
318 static void setupDumpWorker(Archive *AHX);
319 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
320 
321 
322 int
323 main(int argc, char **argv)
324 {
325  int c;
326  const char *filename = NULL;
327  const char *format = "p";
328  TableInfo *tblinfo;
329  int numTables;
330  DumpableObject **dobjs;
331  int numObjs;
332  DumpableObject *boundaryObjs;
333  int i;
334  int optindex;
335  RestoreOptions *ropt;
336  Archive *fout; /* the script file */
337  bool g_verbose = false;
338  const char *dumpencoding = NULL;
339  const char *dumpsnapshot = NULL;
340  char *use_role = NULL;
341  int numWorkers = 1;
342  int compressLevel = -1;
343  int plainText = 0;
344  ArchiveFormat archiveFormat = archUnknown;
345  ArchiveMode archiveMode;
346 
347  static DumpOptions dopt;
348 
349  static struct option long_options[] = {
350  {"data-only", no_argument, NULL, 'a'},
351  {"blobs", no_argument, NULL, 'b'},
352  {"no-blobs", no_argument, NULL, 'B'},
353  {"clean", no_argument, NULL, 'c'},
354  {"create", no_argument, NULL, 'C'},
355  {"dbname", required_argument, NULL, 'd'},
356  {"extension", required_argument, NULL, 'e'},
357  {"file", required_argument, NULL, 'f'},
358  {"format", required_argument, NULL, 'F'},
359  {"host", required_argument, NULL, 'h'},
360  {"jobs", 1, NULL, 'j'},
361  {"no-reconnect", no_argument, NULL, 'R'},
362  {"no-owner", no_argument, NULL, 'O'},
363  {"port", required_argument, NULL, 'p'},
364  {"schema", required_argument, NULL, 'n'},
365  {"exclude-schema", required_argument, NULL, 'N'},
366  {"schema-only", no_argument, NULL, 's'},
367  {"superuser", required_argument, NULL, 'S'},
368  {"table", required_argument, NULL, 't'},
369  {"exclude-table", required_argument, NULL, 'T'},
370  {"no-password", no_argument, NULL, 'w'},
371  {"password", no_argument, NULL, 'W'},
372  {"username", required_argument, NULL, 'U'},
373  {"verbose", no_argument, NULL, 'v'},
374  {"no-privileges", no_argument, NULL, 'x'},
375  {"no-acl", no_argument, NULL, 'x'},
376  {"compress", required_argument, NULL, 'Z'},
377  {"encoding", required_argument, NULL, 'E'},
378  {"help", no_argument, NULL, '?'},
379  {"version", no_argument, NULL, 'V'},
380 
381  /*
382  * the following options don't have an equivalent short option letter
383  */
384  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
385  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
386  {"column-inserts", no_argument, &dopt.column_inserts, 1},
387  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
388  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
389  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
390  {"exclude-table-data", required_argument, NULL, 4},
391  {"extra-float-digits", required_argument, NULL, 8},
392  {"if-exists", no_argument, &dopt.if_exists, 1},
393  {"inserts", no_argument, NULL, 9},
394  {"lock-wait-timeout", required_argument, NULL, 2},
395  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
396  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
397  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
398  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
399  {"role", required_argument, NULL, 3},
400  {"section", required_argument, NULL, 5},
401  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
402  {"snapshot", required_argument, NULL, 6},
403  {"strict-names", no_argument, &strict_names, 1},
404  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
405  {"no-comments", no_argument, &dopt.no_comments, 1},
406  {"no-publications", no_argument, &dopt.no_publications, 1},
407  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
408  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
409  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
410  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
411  {"no-sync", no_argument, NULL, 7},
412  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
413  {"rows-per-insert", required_argument, NULL, 10},
414  {"include-foreign-data", required_argument, NULL, 11},
415 
416  {NULL, 0, NULL, 0}
417  };
418 
419  pg_logging_init(argv[0]);
421  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
422 
423  /*
424  * Initialize what we need for parallel execution, especially for thread
425  * support on Windows.
426  */
428 
429  progname = get_progname(argv[0]);
430 
431  if (argc > 1)
432  {
433  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
434  {
435  help(progname);
436  exit_nicely(0);
437  }
438  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
439  {
440  puts("pg_dump (PostgreSQL) " PG_VERSION);
441  exit_nicely(0);
442  }
443  }
444 
445  InitDumpOptions(&dopt);
446 
447  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
448  long_options, &optindex)) != -1)
449  {
450  switch (c)
451  {
452  case 'a': /* Dump data only */
453  dopt.dataOnly = true;
454  break;
455 
456  case 'b': /* Dump blobs */
457  dopt.outputBlobs = true;
458  break;
459 
460  case 'B': /* Don't dump blobs */
461  dopt.dontOutputBlobs = true;
462  break;
463 
464  case 'c': /* clean (i.e., drop) schema prior to create */
465  dopt.outputClean = 1;
466  break;
467 
468  case 'C': /* Create DB */
469  dopt.outputCreateDB = 1;
470  break;
471 
472  case 'd': /* database name */
473  dopt.cparams.dbname = pg_strdup(optarg);
474  break;
475 
476  case 'e': /* include extension(s) */
478  dopt.include_everything = false;
479  break;
480 
481  case 'E': /* Dump encoding */
482  dumpencoding = pg_strdup(optarg);
483  break;
484 
485  case 'f':
487  break;
488 
489  case 'F':
491  break;
492 
493  case 'h': /* server host */
494  dopt.cparams.pghost = pg_strdup(optarg);
495  break;
496 
497  case 'j': /* number of dump jobs */
498  if (!option_parse_int(optarg, "-j/--jobs", 1,
499  PG_MAX_JOBS,
500  &numWorkers))
501  exit_nicely(1);
502  break;
503 
504  case 'n': /* include schema(s) */
506  dopt.include_everything = false;
507  break;
508 
509  case 'N': /* exclude schema(s) */
511  break;
512 
513  case 'O': /* Don't reconnect to match owner */
514  dopt.outputNoOwner = 1;
515  break;
516 
517  case 'p': /* server port */
518  dopt.cparams.pgport = pg_strdup(optarg);
519  break;
520 
521  case 'R':
522  /* no-op, still accepted for backwards compatibility */
523  break;
524 
525  case 's': /* dump schema only */
526  dopt.schemaOnly = true;
527  break;
528 
529  case 'S': /* Username for superuser in plain text output */
531  break;
532 
533  case 't': /* include table(s) */
535  dopt.include_everything = false;
536  break;
537 
538  case 'T': /* exclude table(s) */
540  break;
541 
542  case 'U':
544  break;
545 
546  case 'v': /* verbose */
547  g_verbose = true;
549  break;
550 
551  case 'w':
553  break;
554 
555  case 'W':
557  break;
558 
559  case 'x': /* skip ACL dump */
560  dopt.aclsSkip = true;
561  break;
562 
563  case 'Z': /* Compression Level */
564  if (!option_parse_int(optarg, "-Z/--compress", 0, 9,
565  &compressLevel))
566  exit_nicely(1);
567  break;
568 
569  case 0:
570  /* This covers the long options. */
571  break;
572 
573  case 2: /* lock-wait-timeout */
575  break;
576 
577  case 3: /* SET ROLE */
578  use_role = pg_strdup(optarg);
579  break;
580 
581  case 4: /* exclude table(s) data */
583  break;
584 
585  case 5: /* section */
587  break;
588 
589  case 6: /* snapshot */
590  dumpsnapshot = pg_strdup(optarg);
591  break;
592 
593  case 7: /* no-sync */
594  dosync = false;
595  break;
596 
597  case 8:
599  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
601  exit_nicely(1);
602  break;
603 
604  case 9: /* inserts */
605 
606  /*
607  * dump_inserts also stores --rows-per-insert, careful not to
608  * overwrite that.
609  */
610  if (dopt.dump_inserts == 0)
612  break;
613 
614  case 10: /* rows per insert */
615  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
616  &dopt.dump_inserts))
617  exit_nicely(1);
618  break;
619 
620  case 11: /* include foreign data */
622  optarg);
623  break;
624 
625  default:
626  /* getopt_long already emitted a complaint */
627  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
628  exit_nicely(1);
629  }
630  }
631 
632  /*
633  * Non-option argument specifies database name as long as it wasn't
634  * already specified with -d / --dbname
635  */
636  if (optind < argc && dopt.cparams.dbname == NULL)
637  dopt.cparams.dbname = argv[optind++];
638 
639  /* Complain if any arguments remain */
640  if (optind < argc)
641  {
642  pg_log_error("too many command-line arguments (first is \"%s\")",
643  argv[optind]);
644  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
645  exit_nicely(1);
646  }
647 
648  /* --column-inserts implies --inserts */
649  if (dopt.column_inserts && dopt.dump_inserts == 0)
651 
652  /*
653  * Binary upgrade mode implies dumping sequence data even in schema-only
654  * mode. This is not exposed as a separate option, but kept separate
655  * internally for clarity.
656  */
657  if (dopt.binary_upgrade)
658  dopt.sequence_data = 1;
659 
660  if (dopt.dataOnly && dopt.schemaOnly)
661  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
662 
664  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
665 
666  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
667  pg_fatal("option --include-foreign-data is not supported with parallel backup");
668 
669  if (dopt.dataOnly && dopt.outputClean)
670  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
671 
672  if (dopt.if_exists && !dopt.outputClean)
673  pg_fatal("option --if-exists requires option -c/--clean");
674 
675  /*
676  * --inserts are already implied above if --column-inserts or
677  * --rows-per-insert were specified.
678  */
679  if (dopt.do_nothing && dopt.dump_inserts == 0)
680  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
681 
682  /* Identify archive format to emit */
683  archiveFormat = parseArchiveFormat(format, &archiveMode);
684 
685  /* archiveFormat specific setup */
686  if (archiveFormat == archNull)
687  plainText = 1;
688 
689  /* Custom and directory formats are compressed by default, others not */
690  if (compressLevel == -1)
691  {
692 #ifdef HAVE_LIBZ
693  if (archiveFormat == archCustom || archiveFormat == archDirectory)
694  compressLevel = Z_DEFAULT_COMPRESSION;
695  else
696 #endif
697  compressLevel = 0;
698  }
699 
700 #ifndef HAVE_LIBZ
701  if (compressLevel != 0)
702  pg_log_warning("requested compression not available in this installation -- archive will be uncompressed");
703  compressLevel = 0;
704 #endif
705 
706  /*
707  * If emitting an archive format, we always want to emit a DATABASE item,
708  * in case --create is specified at pg_restore time.
709  */
710  if (!plainText)
711  dopt.outputCreateDB = 1;
712 
713  /* Parallel backup only in the directory archive format so far */
714  if (archiveFormat != archDirectory && numWorkers > 1)
715  pg_fatal("parallel backup only supported by the directory format");
716 
717  /* Open the output file */
718  fout = CreateArchive(filename, archiveFormat, compressLevel, dosync,
719  archiveMode, setupDumpWorker);
720 
721  /* Make dump options accessible right away */
722  SetArchiveOptions(fout, &dopt, NULL);
723 
724  /* Register the cleanup hook */
725  on_exit_close_archive(fout);
726 
727  /* Let the archiver know how noisy to be */
728  fout->verbose = g_verbose;
729 
730 
731  /*
732  * We allow the server to be back to 9.2, and up to any minor release of
733  * our own major version. (See also version check in pg_dumpall.c.)
734  */
735  fout->minRemoteVersion = 90200;
736  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
737 
738  fout->numWorkers = numWorkers;
739 
740  /*
741  * Open the database using the Archiver, so it knows about it. Errors mean
742  * death.
743  */
744  ConnectDatabase(fout, &dopt.cparams, false);
745  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
746 
747  /*
748  * On hot standbys, never try to dump unlogged table data, since it will
749  * just throw an error.
750  */
751  if (fout->isStandby)
752  dopt.no_unlogged_table_data = true;
753 
754  /*
755  * Find the last built-in OID, if needed (prior to 8.1)
756  *
757  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
758  */
760 
761  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
762 
763  /* Expand schema selection patterns into OID lists */
764  if (schema_include_patterns.head != NULL)
765  {
768  strict_names);
769  if (schema_include_oids.head == NULL)
770  pg_fatal("no matching schemas were found");
771  }
774  false);
775  /* non-matching exclusion patterns aren't an error */
776 
777  /* Expand table selection patterns into OID lists */
778  if (table_include_patterns.head != NULL)
779  {
782  strict_names);
783  if (table_include_oids.head == NULL)
784  pg_fatal("no matching tables were found");
785  }
788  false);
789 
792  false);
793 
796 
797  /* non-matching exclusion patterns aren't an error */
798 
799  /* Expand extension selection patterns into OID lists */
800  if (extension_include_patterns.head != NULL)
801  {
804  strict_names);
805  if (extension_include_oids.head == NULL)
806  pg_fatal("no matching extensions were found");
807  }
808 
809  /*
810  * Dumping blobs is the default for dumps where an inclusion switch is not
811  * used (an "include everything" dump). -B can be used to exclude blobs
812  * from those dumps. -b can be used to include blobs even when an
813  * inclusion switch is used.
814  *
815  * -s means "schema only" and blobs are data, not schema, so we never
816  * include blobs when -s is used.
817  */
818  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputBlobs)
819  dopt.outputBlobs = true;
820 
821  /*
822  * Collect role names so we can map object owner OIDs to names.
823  */
824  collectRoleNames(fout);
825 
826  /*
827  * Now scan the database and create DumpableObject structs for all the
828  * objects we intend to dump.
829  */
830  tblinfo = getSchemaData(fout, &numTables);
831 
832  if (!dopt.schemaOnly)
833  {
834  getTableData(&dopt, tblinfo, numTables, 0);
836  if (dopt.dataOnly)
838  }
839 
840  if (dopt.schemaOnly && dopt.sequence_data)
841  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
842 
843  /*
844  * In binary-upgrade mode, we do not have to worry about the actual blob
845  * data or the associated metadata that resides in the pg_largeobject and
846  * pg_largeobject_metadata tables, respectively.
847  *
848  * However, we do need to collect blob information as there may be
849  * comments or other information on blobs that we do need to dump out.
850  */
851  if (dopt.outputBlobs || dopt.binary_upgrade)
852  getBlobs(fout);
853 
854  /*
855  * Collect dependency data to assist in ordering the objects.
856  */
857  getDependencies(fout);
858 
859  /*
860  * Collect ACLs, comments, and security labels, if wanted.
861  */
862  if (!dopt.aclsSkip)
863  getAdditionalACLs(fout);
864  if (!dopt.no_comments)
865  collectComments(fout);
866  if (!dopt.no_security_labels)
867  collectSecLabels(fout);
868 
869  /* Lastly, create dummy objects to represent the section boundaries */
870  boundaryObjs = createBoundaryObjects();
871 
872  /* Get pointers to all the known DumpableObjects */
873  getDumpableObjects(&dobjs, &numObjs);
874 
875  /*
876  * Add dummy dependencies to enforce the dump section ordering.
877  */
878  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
879 
880  /*
881  * Sort the objects into a safe dump order (no forward references).
882  *
883  * We rely on dependency information to help us determine a safe order, so
884  * the initial sort is mostly for cosmetic purposes: we sort by name to
885  * ensure that logically identical schemas will dump identically.
886  */
887  sortDumpableObjectsByTypeName(dobjs, numObjs);
888 
889  sortDumpableObjects(dobjs, numObjs,
890  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
891 
892  /*
893  * Create archive TOC entries for all the objects to be dumped, in a safe
894  * order.
895  */
896 
897  /*
898  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
899  */
900  dumpEncoding(fout);
901  dumpStdStrings(fout);
902  dumpSearchPath(fout);
903 
904  /* The database items are always next, unless we don't want them at all */
905  if (dopt.outputCreateDB)
906  dumpDatabase(fout);
907 
908  /* Now the rearrangeable objects. */
909  for (i = 0; i < numObjs; i++)
910  dumpDumpableObject(fout, dobjs[i]);
911 
912  /*
913  * Set up options info to ensure we dump what we want.
914  */
915  ropt = NewRestoreOptions();
916  ropt->filename = filename;
917 
918  /* if you change this list, see dumpOptionsFromRestoreOptions */
919  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
920  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
921  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
922  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
924  ropt->dropSchema = dopt.outputClean;
925  ropt->dataOnly = dopt.dataOnly;
926  ropt->schemaOnly = dopt.schemaOnly;
927  ropt->if_exists = dopt.if_exists;
928  ropt->column_inserts = dopt.column_inserts;
929  ropt->dumpSections = dopt.dumpSections;
930  ropt->aclsSkip = dopt.aclsSkip;
931  ropt->superuser = dopt.outputSuperuser;
932  ropt->createDB = dopt.outputCreateDB;
933  ropt->noOwner = dopt.outputNoOwner;
934  ropt->noTableAm = dopt.outputNoTableAm;
935  ropt->noTablespace = dopt.outputNoTablespaces;
936  ropt->disable_triggers = dopt.disable_triggers;
937  ropt->use_setsessauth = dopt.use_setsessauth;
939  ropt->dump_inserts = dopt.dump_inserts;
940  ropt->no_comments = dopt.no_comments;
941  ropt->no_publications = dopt.no_publications;
943  ropt->no_subscriptions = dopt.no_subscriptions;
944  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
947  ropt->sequence_data = dopt.sequence_data;
948  ropt->binary_upgrade = dopt.binary_upgrade;
949 
950  if (compressLevel == -1)
951  ropt->compression = 0;
952  else
953  ropt->compression = compressLevel;
954 
955  ropt->suppressDumpWarnings = true; /* We've already shown them */
956 
957  SetArchiveOptions(fout, &dopt, ropt);
958 
959  /* Mark which entries should be output */
961 
962  /*
963  * The archive's TOC entries are now marked as to which ones will actually
964  * be output, so we can set up their dependency lists properly. This isn't
965  * necessary for plain-text output, though.
966  */
967  if (!plainText)
969 
970  /*
971  * And finally we can do the actual output.
972  *
973  * Note: for non-plain-text output formats, the output file is written
974  * inside CloseArchive(). This is, um, bizarre; but not worth changing
975  * right now.
976  */
977  if (plainText)
978  RestoreArchive(fout);
979 
980  CloseArchive(fout);
981 
982  exit_nicely(0);
983 }
984 
985 
986 static void
987 help(const char *progname)
988 {
989  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
990  printf(_("Usage:\n"));
991  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
992 
993  printf(_("\nGeneral options:\n"));
994  printf(_(" -f, --file=FILENAME output file or directory name\n"));
995  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
996  " plain text (default))\n"));
997  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
998  printf(_(" -v, --verbose verbose mode\n"));
999  printf(_(" -V, --version output version information, then exit\n"));
1000  printf(_(" -Z, --compress=0-9 compression level for compressed formats\n"));
1001  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1002  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1003  printf(_(" -?, --help show this help, then exit\n"));
1004 
1005  printf(_("\nOptions controlling the output content:\n"));
1006  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1007  printf(_(" -b, --blobs include large objects in dump\n"));
1008  printf(_(" -B, --no-blobs exclude large objects in dump\n"));
1009  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1010  printf(_(" -C, --create include commands to create database in dump\n"));
1011  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1012  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1013  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1014  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1015  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1016  " plain-text format\n"));
1017  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1018  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1019  printf(_(" -t, --table=PATTERN dump the specified table(s) only\n"));
1020  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1021  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1022  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1023  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1024  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1025  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1026  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1027  " access to)\n"));
1028  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1029  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1030  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1031  printf(_(" --include-foreign-data=PATTERN\n"
1032  " include data of foreign tables on foreign\n"
1033  " servers matching PATTERN\n"));
1034  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1035  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1036  printf(_(" --no-comments do not dump comments\n"));
1037  printf(_(" --no-publications do not dump publications\n"));
1038  printf(_(" --no-security-labels do not dump security label assignments\n"));
1039  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1040  printf(_(" --no-table-access-method do not dump table access methods\n"));
1041  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1042  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1043  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1044  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1045  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1046  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1047  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1048  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1049  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1050  printf(_(" --strict-names require table and/or schema include patterns to\n"
1051  " match at least one entity each\n"));
1052  printf(_(" --use-set-session-authorization\n"
1053  " use SET SESSION AUTHORIZATION commands instead of\n"
1054  " ALTER OWNER commands to set ownership\n"));
1055 
1056  printf(_("\nConnection options:\n"));
1057  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1058  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1059  printf(_(" -p, --port=PORT database server port number\n"));
1060  printf(_(" -U, --username=NAME connect as specified database user\n"));
1061  printf(_(" -w, --no-password never prompt for password\n"));
1062  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1063  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1064 
1065  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1066  "variable value is used.\n\n"));
1067  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1068  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1069 }
1070 
1071 static void
1072 setup_connection(Archive *AH, const char *dumpencoding,
1073  const char *dumpsnapshot, char *use_role)
1074 {
1075  DumpOptions *dopt = AH->dopt;
1076  PGconn *conn = GetConnection(AH);
1077  const char *std_strings;
1078 
1080 
1081  /*
1082  * Set the client encoding if requested.
1083  */
1084  if (dumpencoding)
1085  {
1086  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1087  pg_fatal("invalid client encoding \"%s\" specified",
1088  dumpencoding);
1089  }
1090 
1091  /*
1092  * Get the active encoding and the standard_conforming_strings setting, so
1093  * we know how to escape strings.
1094  */
1096 
1097  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1098  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1099 
1100  /*
1101  * Set the role if requested. In a parallel dump worker, we'll be passed
1102  * use_role == NULL, but AH->use_role is already set (if user specified it
1103  * originally) and we should use that.
1104  */
1105  if (!use_role && AH->use_role)
1106  use_role = AH->use_role;
1107 
1108  /* Set the role if requested */
1109  if (use_role)
1110  {
1111  PQExpBuffer query = createPQExpBuffer();
1112 
1113  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1114  ExecuteSqlStatement(AH, query->data);
1115  destroyPQExpBuffer(query);
1116 
1117  /* save it for possible later use by parallel workers */
1118  if (!AH->use_role)
1119  AH->use_role = pg_strdup(use_role);
1120  }
1121 
1122  /* Set the datestyle to ISO to ensure the dump's portability */
1123  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1124 
1125  /* Likewise, avoid using sql_standard intervalstyle */
1126  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1127 
1128  /*
1129  * Use an explicitly specified extra_float_digits if it has been provided.
1130  * Otherwise, set extra_float_digits so that we can dump float data
1131  * exactly (given correctly implemented float I/O code, anyway).
1132  */
1134  {
1136 
1137  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1139  ExecuteSqlStatement(AH, q->data);
1140  destroyPQExpBuffer(q);
1141  }
1142  else
1143  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1144 
1145  /*
1146  * Disable synchronized scanning, to prevent unpredictable changes in row
1147  * ordering across a dump and reload.
1148  */
1149  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1150 
1151  /*
1152  * Disable timeouts if supported.
1153  */
1154  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1155  if (AH->remoteVersion >= 90300)
1156  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1157  if (AH->remoteVersion >= 90600)
1158  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1159 
1160  /*
1161  * Quote all identifiers, if requested.
1162  */
1164  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1165 
1166  /*
1167  * Adjust row-security mode, if supported.
1168  */
1169  if (AH->remoteVersion >= 90500)
1170  {
1171  if (dopt->enable_row_security)
1172  ExecuteSqlStatement(AH, "SET row_security = on");
1173  else
1174  ExecuteSqlStatement(AH, "SET row_security = off");
1175  }
1176 
1177  /*
1178  * Initialize prepared-query state to "nothing prepared". We do this here
1179  * so that a parallel dump worker will have its own state.
1180  */
1181  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1182 
1183  /*
1184  * Start transaction-snapshot mode transaction to dump consistent data.
1185  */
1186  ExecuteSqlStatement(AH, "BEGIN");
1187 
1188  /*
1189  * To support the combination of serializable_deferrable with the jobs
1190  * option we use REPEATABLE READ for the worker connections that are
1191  * passed a snapshot. As long as the snapshot is acquired in a
1192  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1193  * REPEATABLE READ transaction provides the appropriate integrity
1194  * guarantees. This is a kluge, but safe for back-patching.
1195  */
1196  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1198  "SET TRANSACTION ISOLATION LEVEL "
1199  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1200  else
1202  "SET TRANSACTION ISOLATION LEVEL "
1203  "REPEATABLE READ, READ ONLY");
1204 
1205  /*
1206  * If user specified a snapshot to use, select that. In a parallel dump
1207  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1208  * is already set (if the server can handle it) and we should use that.
1209  */
1210  if (dumpsnapshot)
1211  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1212 
1213  if (AH->sync_snapshot_id)
1214  {
1215  PQExpBuffer query = createPQExpBuffer();
1216 
1217  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1219  ExecuteSqlStatement(AH, query->data);
1220  destroyPQExpBuffer(query);
1221  }
1222  else if (AH->numWorkers > 1)
1223  {
1224  if (AH->isStandby && AH->remoteVersion < 100000)
1225  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1227  }
1228 }
1229 
1230 /* Set up connection for a parallel worker process */
1231 static void
1233 {
1234  /*
1235  * We want to re-select all the same values the leader connection is
1236  * using. We'll have inherited directly-usable values in
1237  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1238  * inherited encoding value back to a string to pass to setup_connection.
1239  */
1240  setup_connection(AH,
1242  NULL,
1243  NULL);
1244 }
1245 
1246 static char *
1248 {
1249  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1250  char *result;
1251  PGresult *res;
1252 
1253  res = ExecuteSqlQueryForSingleRow(fout, query);
1254  result = pg_strdup(PQgetvalue(res, 0, 0));
1255  PQclear(res);
1256 
1257  return result;
1258 }
1259 
1260 static ArchiveFormat
1262 {
1263  ArchiveFormat archiveFormat;
1264 
1265  *mode = archModeWrite;
1266 
1267  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1268  {
1269  /* This is used by pg_dumpall, and is not documented */
1270  archiveFormat = archNull;
1271  *mode = archModeAppend;
1272  }
1273  else if (pg_strcasecmp(format, "c") == 0)
1274  archiveFormat = archCustom;
1275  else if (pg_strcasecmp(format, "custom") == 0)
1276  archiveFormat = archCustom;
1277  else if (pg_strcasecmp(format, "d") == 0)
1278  archiveFormat = archDirectory;
1279  else if (pg_strcasecmp(format, "directory") == 0)
1280  archiveFormat = archDirectory;
1281  else if (pg_strcasecmp(format, "p") == 0)
1282  archiveFormat = archNull;
1283  else if (pg_strcasecmp(format, "plain") == 0)
1284  archiveFormat = archNull;
1285  else if (pg_strcasecmp(format, "t") == 0)
1286  archiveFormat = archTar;
1287  else if (pg_strcasecmp(format, "tar") == 0)
1288  archiveFormat = archTar;
1289  else
1290  pg_fatal("invalid output format \"%s\" specified", format);
1291  return archiveFormat;
1292 }
1293 
1294 /*
1295  * Find the OIDs of all schemas matching the given list of patterns,
1296  * and append them to the given OID list.
1297  */
1298 static void
1300  SimpleStringList *patterns,
1301  SimpleOidList *oids,
1302  bool strict_names)
1303 {
1304  PQExpBuffer query;
1305  PGresult *res;
1306  SimpleStringListCell *cell;
1307  int i;
1308 
1309  if (patterns->head == NULL)
1310  return; /* nothing to do */
1311 
1312  query = createPQExpBuffer();
1313 
1314  /*
1315  * The loop below runs multiple SELECTs might sometimes result in
1316  * duplicate entries in the OID list, but we don't care.
1317  */
1318 
1319  for (cell = patterns->head; cell; cell = cell->next)
1320  {
1321  PQExpBufferData dbbuf;
1322  int dotcnt;
1323 
1324  appendPQExpBufferStr(query,
1325  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1326  initPQExpBuffer(&dbbuf);
1327  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1328  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1329  &dotcnt);
1330  if (dotcnt > 1)
1331  pg_fatal("improper qualified name (too many dotted names): %s",
1332  cell->val);
1333  else if (dotcnt == 1)
1334  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1335  termPQExpBuffer(&dbbuf);
1336 
1337  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1338  if (strict_names && PQntuples(res) == 0)
1339  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1340 
1341  for (i = 0; i < PQntuples(res); i++)
1342  {
1344  }
1345 
1346  PQclear(res);
1347  resetPQExpBuffer(query);
1348  }
1349 
1350  destroyPQExpBuffer(query);
1351 }
1352 
1353 /*
1354  * Find the OIDs of all extensions matching the given list of patterns,
1355  * and append them to the given OID list.
1356  */
1357 static void
1359  SimpleStringList *patterns,
1360  SimpleOidList *oids,
1361  bool strict_names)
1362 {
1363  PQExpBuffer query;
1364  PGresult *res;
1365  SimpleStringListCell *cell;
1366  int i;
1367 
1368  if (patterns->head == NULL)
1369  return; /* nothing to do */
1370 
1371  query = createPQExpBuffer();
1372 
1373  /*
1374  * The loop below runs multiple SELECTs might sometimes result in
1375  * duplicate entries in the OID list, but we don't care.
1376  */
1377  for (cell = patterns->head; cell; cell = cell->next)
1378  {
1379  int dotcnt;
1380 
1381  appendPQExpBufferStr(query,
1382  "SELECT oid FROM pg_catalog.pg_extension e\n");
1383  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1384  false, NULL, "e.extname", NULL, NULL, NULL,
1385  &dotcnt);
1386  if (dotcnt > 0)
1387  pg_fatal("improper qualified name (too many dotted names): %s",
1388  cell->val);
1389 
1390  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1391  if (strict_names && PQntuples(res) == 0)
1392  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1393 
1394  for (i = 0; i < PQntuples(res); i++)
1395  {
1397  }
1398 
1399  PQclear(res);
1400  resetPQExpBuffer(query);
1401  }
1402 
1403  destroyPQExpBuffer(query);
1404 }
1405 
1406 /*
1407  * Find the OIDs of all foreign servers matching the given list of patterns,
1408  * and append them to the given OID list.
1409  */
1410 static void
1412  SimpleStringList *patterns,
1413  SimpleOidList *oids)
1414 {
1415  PQExpBuffer query;
1416  PGresult *res;
1417  SimpleStringListCell *cell;
1418  int i;
1419 
1420  if (patterns->head == NULL)
1421  return; /* nothing to do */
1422 
1423  query = createPQExpBuffer();
1424 
1425  /*
1426  * The loop below runs multiple SELECTs might sometimes result in
1427  * duplicate entries in the OID list, but we don't care.
1428  */
1429 
1430  for (cell = patterns->head; cell; cell = cell->next)
1431  {
1432  int dotcnt;
1433 
1434  appendPQExpBufferStr(query,
1435  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1436  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1437  false, NULL, "s.srvname", NULL, NULL, NULL,
1438  &dotcnt);
1439  if (dotcnt > 0)
1440  pg_fatal("improper qualified name (too many dotted names): %s",
1441  cell->val);
1442 
1443  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1444  if (PQntuples(res) == 0)
1445  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1446 
1447  for (i = 0; i < PQntuples(res); i++)
1449 
1450  PQclear(res);
1451  resetPQExpBuffer(query);
1452  }
1453 
1454  destroyPQExpBuffer(query);
1455 }
1456 
1457 /*
1458  * Find the OIDs of all tables matching the given list of patterns,
1459  * and append them to the given OID list. See also expand_dbname_patterns()
1460  * in pg_dumpall.c
1461  */
1462 static void
1464  SimpleStringList *patterns, SimpleOidList *oids,
1465  bool strict_names)
1466 {
1467  PQExpBuffer query;
1468  PGresult *res;
1469  SimpleStringListCell *cell;
1470  int i;
1471 
1472  if (patterns->head == NULL)
1473  return; /* nothing to do */
1474 
1475  query = createPQExpBuffer();
1476 
1477  /*
1478  * this might sometimes result in duplicate entries in the OID list, but
1479  * we don't care.
1480  */
1481 
1482  for (cell = patterns->head; cell; cell = cell->next)
1483  {
1484  PQExpBufferData dbbuf;
1485  int dotcnt;
1486 
1487  /*
1488  * Query must remain ABSOLUTELY devoid of unqualified names. This
1489  * would be unnecessary given a pg_table_is_visible() variant taking a
1490  * search_path argument.
1491  */
1492  appendPQExpBuffer(query,
1493  "SELECT c.oid"
1494  "\nFROM pg_catalog.pg_class c"
1495  "\n LEFT JOIN pg_catalog.pg_namespace n"
1496  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1497  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1498  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1499  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1500  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1501  RELKIND_PARTITIONED_TABLE);
1502  initPQExpBuffer(&dbbuf);
1503  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1504  false, "n.nspname", "c.relname", NULL,
1505  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1506  &dotcnt);
1507  if (dotcnt > 2)
1508  pg_fatal("improper relation name (too many dotted names): %s",
1509  cell->val);
1510  else if (dotcnt == 2)
1511  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1512  termPQExpBuffer(&dbbuf);
1513 
1514  ExecuteSqlStatement(fout, "RESET search_path");
1515  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1518  if (strict_names && PQntuples(res) == 0)
1519  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1520 
1521  for (i = 0; i < PQntuples(res); i++)
1522  {
1524  }
1525 
1526  PQclear(res);
1527  resetPQExpBuffer(query);
1528  }
1529 
1530  destroyPQExpBuffer(query);
1531 }
1532 
1533 /*
1534  * Verifies that the connected database name matches the given database name,
1535  * and if not, dies with an error about the given pattern.
1536  *
1537  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1538  */
1539 static void
1540 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1541 {
1542  const char *db;
1543 
1544  db = PQdb(conn);
1545  if (db == NULL)
1546  pg_fatal("You are currently not connected to a database.");
1547 
1548  if (strcmp(db, dbname) != 0)
1549  pg_fatal("cross-database references are not implemented: %s",
1550  pattern);
1551 }
1552 
1553 /*
1554  * checkExtensionMembership
1555  * Determine whether object is an extension member, and if so,
1556  * record an appropriate dependency and set the object's dump flag.
1557  *
1558  * It's important to call this for each object that could be an extension
1559  * member. Generally, we integrate this with determining the object's
1560  * to-be-dumped-ness, since extension membership overrides other rules for that.
1561  *
1562  * Returns true if object is an extension member, else false.
1563  */
1564 static bool
1566 {
1567  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1568 
1569  if (ext == NULL)
1570  return false;
1571 
1572  dobj->ext_member = true;
1573 
1574  /* Record dependency so that getDependencies needn't deal with that */
1575  addObjectDependency(dobj, ext->dobj.dumpId);
1576 
1577  /*
1578  * In 9.6 and above, mark the member object to have any non-initial ACL,
1579  * policies, and security labels dumped.
1580  *
1581  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1582  * extract the information about the object. We don't provide support for
1583  * initial policies and security labels and it seems unlikely for those to
1584  * ever exist, but we may have to revisit this later.
1585  *
1586  * Prior to 9.6, we do not include any extension member components.
1587  *
1588  * In binary upgrades, we still dump all components of the members
1589  * individually, since the idea is to exactly reproduce the database
1590  * contents rather than replace the extension contents with something
1591  * different.
1592  */
1593  if (fout->dopt->binary_upgrade)
1594  dobj->dump = ext->dobj.dump;
1595  else
1596  {
1597  if (fout->remoteVersion < 90600)
1598  dobj->dump = DUMP_COMPONENT_NONE;
1599  else
1600  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1603  }
1604 
1605  return true;
1606 }
1607 
1608 /*
1609  * selectDumpableNamespace: policy-setting subroutine
1610  * Mark a namespace as to be dumped or not
1611  */
1612 static void
1614 {
1615  /*
1616  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1617  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1618  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1619  */
1620  nsinfo->create = true;
1621 
1622  /*
1623  * If specific tables are being dumped, do not dump any complete
1624  * namespaces. If specific namespaces are being dumped, dump just those
1625  * namespaces. Otherwise, dump all non-system namespaces.
1626  */
1627  if (table_include_oids.head != NULL)
1628  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1629  else if (schema_include_oids.head != NULL)
1630  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1632  nsinfo->dobj.catId.oid) ?
1634  else if (fout->remoteVersion >= 90600 &&
1635  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1636  {
1637  /*
1638  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1639  * they are interesting (and not the original ACLs which were set at
1640  * initdb time, see pg_init_privs).
1641  */
1642  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1643  }
1644  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1645  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1646  {
1647  /* Other system schemas don't get dumped */
1648  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1649  }
1650  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1651  {
1652  /*
1653  * The public schema is a strange beast that sits in a sort of
1654  * no-mans-land between being a system object and a user object.
1655  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1656  * a comment and an indication of ownership. If the owner is the
1657  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1658  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1659  */
1660  nsinfo->create = false;
1661  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1662  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1663  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1665 
1666  /*
1667  * Also, make like it has a comment even if it doesn't; this is so
1668  * that we'll emit a command to drop the comment, if appropriate.
1669  * (Without this, we'd not call dumpCommentExtended for it.)
1670  */
1672  }
1673  else
1674  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1675 
1676  /*
1677  * In any case, a namespace can be excluded by an exclusion switch
1678  */
1679  if (nsinfo->dobj.dump_contains &&
1681  nsinfo->dobj.catId.oid))
1682  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1683 
1684  /*
1685  * If the schema belongs to an extension, allow extension membership to
1686  * override the dump decision for the schema itself. However, this does
1687  * not change dump_contains, so this won't change what we do with objects
1688  * within the schema. (If they belong to the extension, they'll get
1689  * suppressed by it, otherwise not.)
1690  */
1691  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1692 }
1693 
1694 /*
1695  * selectDumpableTable: policy-setting subroutine
1696  * Mark a table as to be dumped or not
1697  */
1698 static void
1700 {
1701  if (checkExtensionMembership(&tbinfo->dobj, fout))
1702  return; /* extension membership overrides all else */
1703 
1704  /*
1705  * If specific tables are being dumped, dump just those tables; else, dump
1706  * according to the parent namespace's dump flag.
1707  */
1708  if (table_include_oids.head != NULL)
1710  tbinfo->dobj.catId.oid) ?
1712  else
1713  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1714 
1715  /*
1716  * In any case, a table can be excluded by an exclusion switch
1717  */
1718  if (tbinfo->dobj.dump &&
1720  tbinfo->dobj.catId.oid))
1721  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1722 }
1723 
1724 /*
1725  * selectDumpableType: policy-setting subroutine
1726  * Mark a type as to be dumped or not
1727  *
1728  * If it's a table's rowtype or an autogenerated array type, we also apply a
1729  * special type code to facilitate sorting into the desired order. (We don't
1730  * want to consider those to be ordinary types because that would bring tables
1731  * up into the datatype part of the dump order.) We still set the object's
1732  * dump flag; that's not going to cause the dummy type to be dumped, but we
1733  * need it so that casts involving such types will be dumped correctly -- see
1734  * dumpCast. This means the flag should be set the same as for the underlying
1735  * object (the table or base type).
1736  */
1737 static void
1739 {
1740  /* skip complex types, except for standalone composite types */
1741  if (OidIsValid(tyinfo->typrelid) &&
1742  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1743  {
1744  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1745 
1746  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1747  if (tytable != NULL)
1748  tyinfo->dobj.dump = tytable->dobj.dump;
1749  else
1750  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1751  return;
1752  }
1753 
1754  /* skip auto-generated array types */
1755  if (tyinfo->isArray || tyinfo->isMultirange)
1756  {
1757  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1758 
1759  /*
1760  * Fall through to set the dump flag; we assume that the subsequent
1761  * rules will do the same thing as they would for the array's base
1762  * type. (We cannot reliably look up the base type here, since
1763  * getTypes may not have processed it yet.)
1764  */
1765  }
1766 
1767  if (checkExtensionMembership(&tyinfo->dobj, fout))
1768  return; /* extension membership overrides all else */
1769 
1770  /* Dump based on if the contents of the namespace are being dumped */
1771  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1772 }
1773 
1774 /*
1775  * selectDumpableDefaultACL: policy-setting subroutine
1776  * Mark a default ACL as to be dumped or not
1777  *
1778  * For per-schema default ACLs, dump if the schema is to be dumped.
1779  * Otherwise dump if we are dumping "everything". Note that dataOnly
1780  * and aclsSkip are checked separately.
1781  */
1782 static void
1784 {
1785  /* Default ACLs can't be extension members */
1786 
1787  if (dinfo->dobj.namespace)
1788  /* default ACLs are considered part of the namespace */
1789  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1790  else
1791  dinfo->dobj.dump = dopt->include_everything ?
1793 }
1794 
1795 /*
1796  * selectDumpableCast: policy-setting subroutine
1797  * Mark a cast as to be dumped or not
1798  *
1799  * Casts do not belong to any particular namespace (since they haven't got
1800  * names), nor do they have identifiable owners. To distinguish user-defined
1801  * casts from built-in ones, we must resort to checking whether the cast's
1802  * OID is in the range reserved for initdb.
1803  */
1804 static void
1806 {
1807  if (checkExtensionMembership(&cast->dobj, fout))
1808  return; /* extension membership overrides all else */
1809 
1810  /*
1811  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1812  * support ACLs currently.
1813  */
1814  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1815  cast->dobj.dump = DUMP_COMPONENT_NONE;
1816  else
1817  cast->dobj.dump = fout->dopt->include_everything ?
1819 }
1820 
1821 /*
1822  * selectDumpableProcLang: policy-setting subroutine
1823  * Mark a procedural language as to be dumped or not
1824  *
1825  * Procedural languages do not belong to any particular namespace. To
1826  * identify built-in languages, we must resort to checking whether the
1827  * language's OID is in the range reserved for initdb.
1828  */
1829 static void
1831 {
1832  if (checkExtensionMembership(&plang->dobj, fout))
1833  return; /* extension membership overrides all else */
1834 
1835  /*
1836  * Only include procedural languages when we are dumping everything.
1837  *
1838  * For from-initdb procedural languages, only include ACLs, as we do for
1839  * the pg_catalog namespace. We need this because procedural languages do
1840  * not live in any namespace.
1841  */
1842  if (!fout->dopt->include_everything)
1843  plang->dobj.dump = DUMP_COMPONENT_NONE;
1844  else
1845  {
1846  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1847  plang->dobj.dump = fout->remoteVersion < 90600 ?
1849  else
1850  plang->dobj.dump = DUMP_COMPONENT_ALL;
1851  }
1852 }
1853 
1854 /*
1855  * selectDumpableAccessMethod: policy-setting subroutine
1856  * Mark an access method as to be dumped or not
1857  *
1858  * Access methods do not belong to any particular namespace. To identify
1859  * built-in access methods, we must resort to checking whether the
1860  * method's OID is in the range reserved for initdb.
1861  */
1862 static void
1864 {
1865  if (checkExtensionMembership(&method->dobj, fout))
1866  return; /* extension membership overrides all else */
1867 
1868  /*
1869  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1870  * they do not support ACLs currently.
1871  */
1872  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1873  method->dobj.dump = DUMP_COMPONENT_NONE;
1874  else
1875  method->dobj.dump = fout->dopt->include_everything ?
1877 }
1878 
1879 /*
1880  * selectDumpableExtension: policy-setting subroutine
1881  * Mark an extension as to be dumped or not
1882  *
1883  * Built-in extensions should be skipped except for checking ACLs, since we
1884  * assume those will already be installed in the target database. We identify
1885  * such extensions by their having OIDs in the range reserved for initdb.
1886  * We dump all user-added extensions by default. No extensions are dumped
1887  * if include_everything is false (i.e., a --schema or --table switch was
1888  * given), except if --extension specifies a list of extensions to dump.
1889  */
1890 static void
1892 {
1893  /*
1894  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
1895  * change permissions on their member objects, if they wish to, and have
1896  * those changes preserved.
1897  */
1898  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1899  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
1900  else
1901  {
1902  /* check if there is a list of extensions to dump */
1903  if (extension_include_oids.head != NULL)
1904  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1906  extinfo->dobj.catId.oid) ?
1908  else
1909  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1910  dopt->include_everything ?
1912  }
1913 }
1914 
1915 /*
1916  * selectDumpablePublicationObject: policy-setting subroutine
1917  * Mark a publication object as to be dumped or not
1918  *
1919  * A publication can have schemas and tables which have schemas, but those are
1920  * ignored in decision making, because publications are only dumped when we are
1921  * dumping everything.
1922  */
1923 static void
1925 {
1926  if (checkExtensionMembership(dobj, fout))
1927  return; /* extension membership overrides all else */
1928 
1929  dobj->dump = fout->dopt->include_everything ?
1931 }
1932 
1933 /*
1934  * selectDumpableObject: policy-setting subroutine
1935  * Mark a generic dumpable object as to be dumped or not
1936  *
1937  * Use this only for object types without a special-case routine above.
1938  */
1939 static void
1941 {
1942  if (checkExtensionMembership(dobj, fout))
1943  return; /* extension membership overrides all else */
1944 
1945  /*
1946  * Default policy is to dump if parent namespace is dumpable, or for
1947  * non-namespace-associated items, dump if we're dumping "everything".
1948  */
1949  if (dobj->namespace)
1950  dobj->dump = dobj->namespace->dobj.dump_contains;
1951  else
1952  dobj->dump = fout->dopt->include_everything ?
1954 }
1955 
1956 /*
1957  * Dump a table's contents for loading using the COPY command
1958  * - this routine is called by the Archiver when it wants the table
1959  * to be dumped.
1960  */
1961 static int
1962 dumpTableData_copy(Archive *fout, const void *dcontext)
1963 {
1964  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
1965  TableInfo *tbinfo = tdinfo->tdtable;
1966  const char *classname = tbinfo->dobj.name;
1968 
1969  /*
1970  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
1971  * which uses it already.
1972  */
1973  PQExpBuffer clistBuf = createPQExpBuffer();
1974  PGconn *conn = GetConnection(fout);
1975  PGresult *res;
1976  int ret;
1977  char *copybuf;
1978  const char *column_list;
1979 
1980  pg_log_info("dumping contents of table \"%s.%s\"",
1981  tbinfo->dobj.namespace->dobj.name, classname);
1982 
1983  /*
1984  * Specify the column list explicitly so that we have no possibility of
1985  * retrieving data in the wrong column order. (The default column
1986  * ordering of COPY will not be what we want in certain corner cases
1987  * involving ADD COLUMN and inheritance.)
1988  */
1989  column_list = fmtCopyColumnList(tbinfo, clistBuf);
1990 
1991  /*
1992  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
1993  * a filter condition was specified. For other cases a simple COPY
1994  * suffices.
1995  */
1996  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
1997  {
1998  appendPQExpBufferStr(q, "COPY (SELECT ");
1999  /* klugery to get rid of parens in column list */
2000  if (strlen(column_list) > 2)
2001  {
2002  appendPQExpBufferStr(q, column_list + 1);
2003  q->data[q->len - 1] = ' ';
2004  }
2005  else
2006  appendPQExpBufferStr(q, "* ");
2007 
2008  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2009  fmtQualifiedDumpable(tbinfo),
2010  tdinfo->filtercond ? tdinfo->filtercond : "");
2011  }
2012  else
2013  {
2014  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2015  fmtQualifiedDumpable(tbinfo),
2016  column_list);
2017  }
2018  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2019  PQclear(res);
2020  destroyPQExpBuffer(clistBuf);
2021 
2022  for (;;)
2023  {
2024  ret = PQgetCopyData(conn, &copybuf, 0);
2025 
2026  if (ret < 0)
2027  break; /* done or error */
2028 
2029  if (copybuf)
2030  {
2031  WriteData(fout, copybuf, ret);
2032  PQfreemem(copybuf);
2033  }
2034 
2035  /* ----------
2036  * THROTTLE:
2037  *
2038  * There was considerable discussion in late July, 2000 regarding
2039  * slowing down pg_dump when backing up large tables. Users with both
2040  * slow & fast (multi-processor) machines experienced performance
2041  * degradation when doing a backup.
2042  *
2043  * Initial attempts based on sleeping for a number of ms for each ms
2044  * of work were deemed too complex, then a simple 'sleep in each loop'
2045  * implementation was suggested. The latter failed because the loop
2046  * was too tight. Finally, the following was implemented:
2047  *
2048  * If throttle is non-zero, then
2049  * See how long since the last sleep.
2050  * Work out how long to sleep (based on ratio).
2051  * If sleep is more than 100ms, then
2052  * sleep
2053  * reset timer
2054  * EndIf
2055  * EndIf
2056  *
2057  * where the throttle value was the number of ms to sleep per ms of
2058  * work. The calculation was done in each loop.
2059  *
2060  * Most of the hard work is done in the backend, and this solution
2061  * still did not work particularly well: on slow machines, the ratio
2062  * was 50:1, and on medium paced machines, 1:1, and on fast
2063  * multi-processor machines, it had little or no effect, for reasons
2064  * that were unclear.
2065  *
2066  * Further discussion ensued, and the proposal was dropped.
2067  *
2068  * For those people who want this feature, it can be implemented using
2069  * gettimeofday in each loop, calculating the time since last sleep,
2070  * multiplying that by the sleep ratio, then if the result is more
2071  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2072  * function to sleep for a subsecond period ie.
2073  *
2074  * select(0, NULL, NULL, NULL, &tvi);
2075  *
2076  * This will return after the interval specified in the structure tvi.
2077  * Finally, call gettimeofday again to save the 'last sleep time'.
2078  * ----------
2079  */
2080  }
2081  archprintf(fout, "\\.\n\n\n");
2082 
2083  if (ret == -2)
2084  {
2085  /* copy data transfer failed */
2086  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2087  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2088  pg_log_error_detail("Command was: %s", q->data);
2089  exit_nicely(1);
2090  }
2091 
2092  /* Check command status and return to normal libpq state */
2093  res = PQgetResult(conn);
2095  {
2096  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2097  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2098  pg_log_error_detail("Command was: %s", q->data);
2099  exit_nicely(1);
2100  }
2101  PQclear(res);
2102 
2103  /* Do this to ensure we've pumped libpq back to idle state */
2104  if (PQgetResult(conn) != NULL)
2105  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2106  classname);
2107 
2108  destroyPQExpBuffer(q);
2109  return 1;
2110 }
2111 
2112 /*
2113  * Dump table data using INSERT commands.
2114  *
2115  * Caution: when we restore from an archive file direct to database, the
2116  * INSERT commands emitted by this function have to be parsed by
2117  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2118  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2119  */
2120 static int
2121 dumpTableData_insert(Archive *fout, const void *dcontext)
2122 {
2123  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2124  TableInfo *tbinfo = tdinfo->tdtable;
2125  DumpOptions *dopt = fout->dopt;
2127  PQExpBuffer insertStmt = NULL;
2128  char *attgenerated;
2129  PGresult *res;
2130  int nfields,
2131  i;
2132  int rows_per_statement = dopt->dump_inserts;
2133  int rows_this_statement = 0;
2134 
2135  /*
2136  * If we're going to emit INSERTs with column names, the most efficient
2137  * way to deal with generated columns is to exclude them entirely. For
2138  * INSERTs without column names, we have to emit DEFAULT rather than the
2139  * actual column value --- but we can save a few cycles by fetching nulls
2140  * rather than the uninteresting-to-us value.
2141  */
2142  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2143  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2144  nfields = 0;
2145  for (i = 0; i < tbinfo->numatts; i++)
2146  {
2147  if (tbinfo->attisdropped[i])
2148  continue;
2149  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2150  continue;
2151  if (nfields > 0)
2152  appendPQExpBufferStr(q, ", ");
2153  if (tbinfo->attgenerated[i])
2154  appendPQExpBufferStr(q, "NULL");
2155  else
2156  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2157  attgenerated[nfields] = tbinfo->attgenerated[i];
2158  nfields++;
2159  }
2160  /* Servers before 9.4 will complain about zero-column SELECT */
2161  if (nfields == 0)
2162  appendPQExpBufferStr(q, "NULL");
2163  appendPQExpBuffer(q, " FROM ONLY %s",
2164  fmtQualifiedDumpable(tbinfo));
2165  if (tdinfo->filtercond)
2166  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2167 
2168  ExecuteSqlStatement(fout, q->data);
2169 
2170  while (1)
2171  {
2172  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2173  PGRES_TUPLES_OK);
2174 
2175  /* cross-check field count, allowing for dummy NULL if any */
2176  if (nfields != PQnfields(res) &&
2177  !(nfields == 0 && PQnfields(res) == 1))
2178  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2179  tbinfo->dobj.name);
2180 
2181  /*
2182  * First time through, we build as much of the INSERT statement as
2183  * possible in "insertStmt", which we can then just print for each
2184  * statement. If the table happens to have zero dumpable columns then
2185  * this will be a complete statement, otherwise it will end in
2186  * "VALUES" and be ready to have the row's column values printed.
2187  */
2188  if (insertStmt == NULL)
2189  {
2190  TableInfo *targettab;
2191 
2192  insertStmt = createPQExpBuffer();
2193 
2194  /*
2195  * When load-via-partition-root is set, get the root table name
2196  * for the partition table, so that we can reload data through the
2197  * root table.
2198  */
2199  if (dopt->load_via_partition_root && tbinfo->ispartition)
2200  targettab = getRootTableInfo(tbinfo);
2201  else
2202  targettab = tbinfo;
2203 
2204  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2205  fmtQualifiedDumpable(targettab));
2206 
2207  /* corner case for zero-column table */
2208  if (nfields == 0)
2209  {
2210  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2211  }
2212  else
2213  {
2214  /* append the list of column names if required */
2215  if (dopt->column_inserts)
2216  {
2217  appendPQExpBufferChar(insertStmt, '(');
2218  for (int field = 0; field < nfields; field++)
2219  {
2220  if (field > 0)
2221  appendPQExpBufferStr(insertStmt, ", ");
2222  appendPQExpBufferStr(insertStmt,
2223  fmtId(PQfname(res, field)));
2224  }
2225  appendPQExpBufferStr(insertStmt, ") ");
2226  }
2227 
2228  if (tbinfo->needs_override)
2229  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2230 
2231  appendPQExpBufferStr(insertStmt, "VALUES");
2232  }
2233  }
2234 
2235  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2236  {
2237  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2238  if (rows_this_statement == 0)
2239  archputs(insertStmt->data, fout);
2240 
2241  /*
2242  * If it is zero-column table then we've already written the
2243  * complete statement, which will mean we've disobeyed
2244  * --rows-per-insert when it's set greater than 1. We do support
2245  * a way to make this multi-row with: SELECT UNION ALL SELECT
2246  * UNION ALL ... but that's non-standard so we should avoid it
2247  * given that using INSERTs is mostly only ever needed for
2248  * cross-database exports.
2249  */
2250  if (nfields == 0)
2251  continue;
2252 
2253  /* Emit a row heading */
2254  if (rows_per_statement == 1)
2255  archputs(" (", fout);
2256  else if (rows_this_statement > 0)
2257  archputs(",\n\t(", fout);
2258  else
2259  archputs("\n\t(", fout);
2260 
2261  for (int field = 0; field < nfields; field++)
2262  {
2263  if (field > 0)
2264  archputs(", ", fout);
2265  if (attgenerated[field])
2266  {
2267  archputs("DEFAULT", fout);
2268  continue;
2269  }
2270  if (PQgetisnull(res, tuple, field))
2271  {
2272  archputs("NULL", fout);
2273  continue;
2274  }
2275 
2276  /* XXX This code is partially duplicated in ruleutils.c */
2277  switch (PQftype(res, field))
2278  {
2279  case INT2OID:
2280  case INT4OID:
2281  case INT8OID:
2282  case OIDOID:
2283  case FLOAT4OID:
2284  case FLOAT8OID:
2285  case NUMERICOID:
2286  {
2287  /*
2288  * These types are printed without quotes unless
2289  * they contain values that aren't accepted by the
2290  * scanner unquoted (e.g., 'NaN'). Note that
2291  * strtod() and friends might accept NaN, so we
2292  * can't use that to test.
2293  *
2294  * In reality we only need to defend against
2295  * infinity and NaN, so we need not get too crazy
2296  * about pattern matching here.
2297  */
2298  const char *s = PQgetvalue(res, tuple, field);
2299 
2300  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2301  archputs(s, fout);
2302  else
2303  archprintf(fout, "'%s'", s);
2304  }
2305  break;
2306 
2307  case BITOID:
2308  case VARBITOID:
2309  archprintf(fout, "B'%s'",
2310  PQgetvalue(res, tuple, field));
2311  break;
2312 
2313  case BOOLOID:
2314  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2315  archputs("true", fout);
2316  else
2317  archputs("false", fout);
2318  break;
2319 
2320  default:
2321  /* All other types are printed as string literals. */
2322  resetPQExpBuffer(q);
2324  PQgetvalue(res, tuple, field),
2325  fout);
2326  archputs(q->data, fout);
2327  break;
2328  }
2329  }
2330 
2331  /* Terminate the row ... */
2332  archputs(")", fout);
2333 
2334  /* ... and the statement, if the target no. of rows is reached */
2335  if (++rows_this_statement >= rows_per_statement)
2336  {
2337  if (dopt->do_nothing)
2338  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2339  else
2340  archputs(";\n", fout);
2341  /* Reset the row counter */
2342  rows_this_statement = 0;
2343  }
2344  }
2345 
2346  if (PQntuples(res) <= 0)
2347  {
2348  PQclear(res);
2349  break;
2350  }
2351  PQclear(res);
2352  }
2353 
2354  /* Terminate any statements that didn't make the row count. */
2355  if (rows_this_statement > 0)
2356  {
2357  if (dopt->do_nothing)
2358  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2359  else
2360  archputs(";\n", fout);
2361  }
2362 
2363  archputs("\n\n", fout);
2364 
2365  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2366 
2367  destroyPQExpBuffer(q);
2368  if (insertStmt != NULL)
2369  destroyPQExpBuffer(insertStmt);
2370  free(attgenerated);
2371 
2372  return 1;
2373 }
2374 
2375 /*
2376  * getRootTableInfo:
2377  * get the root TableInfo for the given partition table.
2378  */
2379 static TableInfo *
2381 {
2382  TableInfo *parentTbinfo;
2383 
2384  Assert(tbinfo->ispartition);
2385  Assert(tbinfo->numParents == 1);
2386 
2387  parentTbinfo = tbinfo->parents[0];
2388  while (parentTbinfo->ispartition)
2389  {
2390  Assert(parentTbinfo->numParents == 1);
2391  parentTbinfo = parentTbinfo->parents[0];
2392  }
2393 
2394  return parentTbinfo;
2395 }
2396 
2397 /*
2398  * dumpTableData -
2399  * dump the contents of a single table
2400  *
2401  * Actually, this just makes an ArchiveEntry for the table contents.
2402  */
2403 static void
2404 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2405 {
2406  DumpOptions *dopt = fout->dopt;
2407  TableInfo *tbinfo = tdinfo->tdtable;
2408  PQExpBuffer copyBuf = createPQExpBuffer();
2409  PQExpBuffer clistBuf = createPQExpBuffer();
2410  DataDumperPtr dumpFn;
2411  char *copyStmt;
2412  const char *copyFrom;
2413 
2414  /* We had better have loaded per-column details about this table */
2415  Assert(tbinfo->interesting);
2416 
2417  if (dopt->dump_inserts == 0)
2418  {
2419  /* Dump/restore using COPY */
2420  dumpFn = dumpTableData_copy;
2421 
2422  /*
2423  * When load-via-partition-root is set, get the root table name for
2424  * the partition table, so that we can reload data through the root
2425  * table.
2426  */
2427  if (dopt->load_via_partition_root && tbinfo->ispartition)
2428  {
2429  TableInfo *parentTbinfo;
2430 
2431  parentTbinfo = getRootTableInfo(tbinfo);
2432  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2433  }
2434  else
2435  copyFrom = fmtQualifiedDumpable(tbinfo);
2436 
2437  /* must use 2 steps here 'cause fmtId is nonreentrant */
2438  appendPQExpBuffer(copyBuf, "COPY %s ",
2439  copyFrom);
2440  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2441  fmtCopyColumnList(tbinfo, clistBuf));
2442  copyStmt = copyBuf->data;
2443  }
2444  else
2445  {
2446  /* Restore using INSERT */
2447  dumpFn = dumpTableData_insert;
2448  copyStmt = NULL;
2449  }
2450 
2451  /*
2452  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2453  * dependency on its table as "special" and pass it to ArchiveEntry now.
2454  * See comments for BuildArchiveDependencies.
2455  */
2456  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2457  {
2458  TocEntry *te;
2459 
2460  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2461  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2462  .namespace = tbinfo->dobj.namespace->dobj.name,
2463  .owner = tbinfo->rolname,
2464  .description = "TABLE DATA",
2465  .section = SECTION_DATA,
2466  .copyStmt = copyStmt,
2467  .deps = &(tbinfo->dobj.dumpId),
2468  .nDeps = 1,
2469  .dumpFn = dumpFn,
2470  .dumpArg = tdinfo));
2471 
2472  /*
2473  * Set the TocEntry's dataLength in case we are doing a parallel dump
2474  * and want to order dump jobs by table size. We choose to measure
2475  * dataLength in table pages (including TOAST pages) during dump, so
2476  * no scaling is needed.
2477  *
2478  * However, relpages is declared as "integer" in pg_class, and hence
2479  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2480  * Cast so that we get the right interpretation of table sizes
2481  * exceeding INT_MAX pages.
2482  */
2483  te->dataLength = (BlockNumber) tbinfo->relpages;
2484  te->dataLength += (BlockNumber) tbinfo->toastpages;
2485 
2486  /*
2487  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2488  * and instead we'd better worry about integer overflow. Clamp to
2489  * INT_MAX if the correct result exceeds that.
2490  */
2491  if (sizeof(te->dataLength) == 4 &&
2492  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2493  te->dataLength < 0))
2494  te->dataLength = INT_MAX;
2495  }
2496 
2497  destroyPQExpBuffer(copyBuf);
2498  destroyPQExpBuffer(clistBuf);
2499 }
2500 
2501 /*
2502  * refreshMatViewData -
2503  * load or refresh the contents of a single materialized view
2504  *
2505  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2506  * statement.
2507  */
2508 static void
2510 {
2511  TableInfo *tbinfo = tdinfo->tdtable;
2512  PQExpBuffer q;
2513 
2514  /* If the materialized view is not flagged as populated, skip this. */
2515  if (!tbinfo->relispopulated)
2516  return;
2517 
2518  q = createPQExpBuffer();
2519 
2520  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2521  fmtQualifiedDumpable(tbinfo));
2522 
2523  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2524  ArchiveEntry(fout,
2525  tdinfo->dobj.catId, /* catalog ID */
2526  tdinfo->dobj.dumpId, /* dump ID */
2527  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2528  .namespace = tbinfo->dobj.namespace->dobj.name,
2529  .owner = tbinfo->rolname,
2530  .description = "MATERIALIZED VIEW DATA",
2531  .section = SECTION_POST_DATA,
2532  .createStmt = q->data,
2533  .deps = tdinfo->dobj.dependencies,
2534  .nDeps = tdinfo->dobj.nDeps));
2535 
2536  destroyPQExpBuffer(q);
2537 }
2538 
2539 /*
2540  * getTableData -
2541  * set up dumpable objects representing the contents of tables
2542  */
2543 static void
2544 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2545 {
2546  int i;
2547 
2548  for (i = 0; i < numTables; i++)
2549  {
2550  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2551  (!relkind || tblinfo[i].relkind == relkind))
2552  makeTableDataInfo(dopt, &(tblinfo[i]));
2553  }
2554 }
2555 
2556 /*
2557  * Make a dumpable object for the data of this specific table
2558  *
2559  * Note: we make a TableDataInfo if and only if we are going to dump the
2560  * table data; the "dump" field in such objects isn't very interesting.
2561  */
2562 static void
2564 {
2565  TableDataInfo *tdinfo;
2566 
2567  /*
2568  * Nothing to do if we already decided to dump the table. This will
2569  * happen for "config" tables.
2570  */
2571  if (tbinfo->dataObj != NULL)
2572  return;
2573 
2574  /* Skip VIEWs (no data to dump) */
2575  if (tbinfo->relkind == RELKIND_VIEW)
2576  return;
2577  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2578  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2581  tbinfo->foreign_server)))
2582  return;
2583  /* Skip partitioned tables (data in partitions) */
2584  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2585  return;
2586 
2587  /* Don't dump data in unlogged tables, if so requested */
2588  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2589  dopt->no_unlogged_table_data)
2590  return;
2591 
2592  /* Check that the data is not explicitly excluded */
2594  tbinfo->dobj.catId.oid))
2595  return;
2596 
2597  /* OK, let's dump it */
2598  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2599 
2600  if (tbinfo->relkind == RELKIND_MATVIEW)
2601  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2602  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2603  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2604  else
2605  tdinfo->dobj.objType = DO_TABLE_DATA;
2606 
2607  /*
2608  * Note: use tableoid 0 so that this object won't be mistaken for
2609  * something that pg_depend entries apply to.
2610  */
2611  tdinfo->dobj.catId.tableoid = 0;
2612  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2613  AssignDumpId(&tdinfo->dobj);
2614  tdinfo->dobj.name = tbinfo->dobj.name;
2615  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2616  tdinfo->tdtable = tbinfo;
2617  tdinfo->filtercond = NULL; /* might get set later */
2618  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2619 
2620  /* A TableDataInfo contains data, of course */
2621  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2622 
2623  tbinfo->dataObj = tdinfo;
2624 
2625  /* Make sure that we'll collect per-column info for this table. */
2626  tbinfo->interesting = true;
2627 }
2628 
2629 /*
2630  * The refresh for a materialized view must be dependent on the refresh for
2631  * any materialized view that this one is dependent on.
2632  *
2633  * This must be called after all the objects are created, but before they are
2634  * sorted.
2635  */
2636 static void
2638 {
2639  PQExpBuffer query;
2640  PGresult *res;
2641  int ntups,
2642  i;
2643  int i_classid,
2644  i_objid,
2645  i_refobjid;
2646 
2647  /* No Mat Views before 9.3. */
2648  if (fout->remoteVersion < 90300)
2649  return;
2650 
2651  query = createPQExpBuffer();
2652 
2653  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2654  "( "
2655  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2656  "FROM pg_depend d1 "
2657  "JOIN pg_class c1 ON c1.oid = d1.objid "
2658  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2659  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2660  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2661  "AND d2.objid = r1.oid "
2662  "AND d2.refobjid <> d1.objid "
2663  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2664  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2665  CppAsString2(RELKIND_VIEW) ") "
2666  "WHERE d1.classid = 'pg_class'::regclass "
2667  "UNION "
2668  "SELECT w.objid, d3.refobjid, c3.relkind "
2669  "FROM w "
2670  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2671  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2672  "AND d3.objid = r3.oid "
2673  "AND d3.refobjid <> w.refobjid "
2674  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2675  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2676  CppAsString2(RELKIND_VIEW) ") "
2677  ") "
2678  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2679  "FROM w "
2680  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2681 
2682  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2683 
2684  ntups = PQntuples(res);
2685 
2686  i_classid = PQfnumber(res, "classid");
2687  i_objid = PQfnumber(res, "objid");
2688  i_refobjid = PQfnumber(res, "refobjid");
2689 
2690  for (i = 0; i < ntups; i++)
2691  {
2692  CatalogId objId;
2693  CatalogId refobjId;
2694  DumpableObject *dobj;
2695  DumpableObject *refdobj;
2696  TableInfo *tbinfo;
2697  TableInfo *reftbinfo;
2698 
2699  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2700  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2701  refobjId.tableoid = objId.tableoid;
2702  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2703 
2704  dobj = findObjectByCatalogId(objId);
2705  if (dobj == NULL)
2706  continue;
2707 
2708  Assert(dobj->objType == DO_TABLE);
2709  tbinfo = (TableInfo *) dobj;
2710  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2711  dobj = (DumpableObject *) tbinfo->dataObj;
2712  if (dobj == NULL)
2713  continue;
2714  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2715 
2716  refdobj = findObjectByCatalogId(refobjId);
2717  if (refdobj == NULL)
2718  continue;
2719 
2720  Assert(refdobj->objType == DO_TABLE);
2721  reftbinfo = (TableInfo *) refdobj;
2722  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2723  refdobj = (DumpableObject *) reftbinfo->dataObj;
2724  if (refdobj == NULL)
2725  continue;
2726  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2727 
2728  addObjectDependency(dobj, refdobj->dumpId);
2729 
2730  if (!reftbinfo->relispopulated)
2731  tbinfo->relispopulated = false;
2732  }
2733 
2734  PQclear(res);
2735 
2736  destroyPQExpBuffer(query);
2737 }
2738 
2739 /*
2740  * getTableDataFKConstraints -
2741  * add dump-order dependencies reflecting foreign key constraints
2742  *
2743  * This code is executed only in a data-only dump --- in schema+data dumps
2744  * we handle foreign key issues by not creating the FK constraints until
2745  * after the data is loaded. In a data-only dump, however, we want to
2746  * order the table data objects in such a way that a table's referenced
2747  * tables are restored first. (In the presence of circular references or
2748  * self-references this may be impossible; we'll detect and complain about
2749  * that during the dependency sorting step.)
2750  */
2751 static void
2753 {
2754  DumpableObject **dobjs;
2755  int numObjs;
2756  int i;
2757 
2758  /* Search through all the dumpable objects for FK constraints */
2759  getDumpableObjects(&dobjs, &numObjs);
2760  for (i = 0; i < numObjs; i++)
2761  {
2762  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2763  {
2764  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2765  TableInfo *ftable;
2766 
2767  /* Not interesting unless both tables are to be dumped */
2768  if (cinfo->contable == NULL ||
2769  cinfo->contable->dataObj == NULL)
2770  continue;
2771  ftable = findTableByOid(cinfo->confrelid);
2772  if (ftable == NULL ||
2773  ftable->dataObj == NULL)
2774  continue;
2775 
2776  /*
2777  * Okay, make referencing table's TABLE_DATA object depend on the
2778  * referenced table's TABLE_DATA object.
2779  */
2781  ftable->dataObj->dobj.dumpId);
2782  }
2783  }
2784  free(dobjs);
2785 }
2786 
2787 
2788 /*
2789  * dumpDatabase:
2790  * dump the database definition
2791  */
2792 static void
2794 {
2795  DumpOptions *dopt = fout->dopt;
2796  PQExpBuffer dbQry = createPQExpBuffer();
2797  PQExpBuffer delQry = createPQExpBuffer();
2798  PQExpBuffer creaQry = createPQExpBuffer();
2799  PQExpBuffer labelq = createPQExpBuffer();
2800  PGconn *conn = GetConnection(fout);
2801  PGresult *res;
2802  int i_tableoid,
2803  i_oid,
2804  i_datname,
2805  i_datdba,
2806  i_encoding,
2807  i_datlocprovider,
2808  i_collate,
2809  i_ctype,
2810  i_daticulocale,
2811  i_frozenxid,
2812  i_minmxid,
2813  i_datacl,
2814  i_acldefault,
2815  i_datistemplate,
2816  i_datconnlimit,
2817  i_datcollversion,
2818  i_tablespace;
2819  CatalogId dbCatId;
2820  DumpId dbDumpId;
2821  DumpableAcl dbdacl;
2822  const char *datname,
2823  *dba,
2824  *encoding,
2825  *datlocprovider,
2826  *collate,
2827  *ctype,
2828  *iculocale,
2829  *datistemplate,
2830  *datconnlimit,
2831  *tablespace;
2832  uint32 frozenxid,
2833  minmxid;
2834  char *qdatname;
2835 
2836  pg_log_info("saving database definition");
2837 
2838  /*
2839  * Fetch the database-level properties for this database.
2840  */
2841  appendPQExpBuffer(dbQry, "SELECT tableoid, oid, datname, "
2842  "datdba, "
2843  "pg_encoding_to_char(encoding) AS encoding, "
2844  "datcollate, datctype, datfrozenxid, "
2845  "datacl, acldefault('d', datdba) AS acldefault, "
2846  "datistemplate, datconnlimit, ");
2847  if (fout->remoteVersion >= 90300)
2848  appendPQExpBuffer(dbQry, "datminmxid, ");
2849  else
2850  appendPQExpBuffer(dbQry, "0 AS datminmxid, ");
2851  if (fout->remoteVersion >= 150000)
2852  appendPQExpBuffer(dbQry, "datlocprovider, daticulocale, datcollversion, ");
2853  else
2854  appendPQExpBuffer(dbQry, "'c' AS datlocprovider, NULL AS daticulocale, NULL AS datcollversion, ");
2855  appendPQExpBuffer(dbQry,
2856  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2857  "shobj_description(oid, 'pg_database') AS description "
2858  "FROM pg_database "
2859  "WHERE datname = current_database()");
2860 
2861  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
2862 
2863  i_tableoid = PQfnumber(res, "tableoid");
2864  i_oid = PQfnumber(res, "oid");
2865  i_datname = PQfnumber(res, "datname");
2866  i_datdba = PQfnumber(res, "datdba");
2867  i_encoding = PQfnumber(res, "encoding");
2868  i_datlocprovider = PQfnumber(res, "datlocprovider");
2869  i_collate = PQfnumber(res, "datcollate");
2870  i_ctype = PQfnumber(res, "datctype");
2871  i_daticulocale = PQfnumber(res, "daticulocale");
2872  i_frozenxid = PQfnumber(res, "datfrozenxid");
2873  i_minmxid = PQfnumber(res, "datminmxid");
2874  i_datacl = PQfnumber(res, "datacl");
2875  i_acldefault = PQfnumber(res, "acldefault");
2876  i_datistemplate = PQfnumber(res, "datistemplate");
2877  i_datconnlimit = PQfnumber(res, "datconnlimit");
2878  i_datcollversion = PQfnumber(res, "datcollversion");
2879  i_tablespace = PQfnumber(res, "tablespace");
2880 
2881  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
2882  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
2883  datname = PQgetvalue(res, 0, i_datname);
2884  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
2885  encoding = PQgetvalue(res, 0, i_encoding);
2886  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
2887  collate = PQgetvalue(res, 0, i_collate);
2888  ctype = PQgetvalue(res, 0, i_ctype);
2889  if (!PQgetisnull(res, 0, i_daticulocale))
2890  iculocale = PQgetvalue(res, 0, i_daticulocale);
2891  else
2892  iculocale = NULL;
2893  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
2894  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
2895  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
2896  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
2897  datistemplate = PQgetvalue(res, 0, i_datistemplate);
2898  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
2899  tablespace = PQgetvalue(res, 0, i_tablespace);
2900 
2901  qdatname = pg_strdup(fmtId(datname));
2902 
2903  /*
2904  * Prepare the CREATE DATABASE command. We must specify OID (if we want
2905  * to preserve that), as well as the encoding, locale, and tablespace
2906  * since those can't be altered later. Other DB properties are left to
2907  * the DATABASE PROPERTIES entry, so that they can be applied after
2908  * reconnecting to the target DB.
2909  */
2910  if (dopt->binary_upgrade)
2911  {
2912  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0 OID = %u",
2913  qdatname, dbCatId.oid);
2914  }
2915  else
2916  {
2917  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
2918  qdatname);
2919  }
2920  if (strlen(encoding) > 0)
2921  {
2922  appendPQExpBufferStr(creaQry, " ENCODING = ");
2923  appendStringLiteralAH(creaQry, encoding, fout);
2924  }
2925 
2926  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
2927  if (datlocprovider[0] == 'c')
2928  appendPQExpBufferStr(creaQry, "libc");
2929  else if (datlocprovider[0] == 'i')
2930  appendPQExpBufferStr(creaQry, "icu");
2931  else
2932  pg_fatal("unrecognized locale provider: %s",
2933  datlocprovider);
2934 
2935  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
2936  {
2937  appendPQExpBufferStr(creaQry, " LOCALE = ");
2938  appendStringLiteralAH(creaQry, collate, fout);
2939  }
2940  else
2941  {
2942  if (strlen(collate) > 0)
2943  {
2944  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
2945  appendStringLiteralAH(creaQry, collate, fout);
2946  }
2947  if (strlen(ctype) > 0)
2948  {
2949  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
2950  appendStringLiteralAH(creaQry, ctype, fout);
2951  }
2952  }
2953  if (iculocale)
2954  {
2955  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
2956  appendStringLiteralAH(creaQry, iculocale, fout);
2957  }
2958 
2959  /*
2960  * For binary upgrade, carry over the collation version. For normal
2961  * dump/restore, omit the version, so that it is computed upon restore.
2962  */
2963  if (dopt->binary_upgrade)
2964  {
2965  if (!PQgetisnull(res, 0, i_datcollversion))
2966  {
2967  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
2968  appendStringLiteralAH(creaQry,
2969  PQgetvalue(res, 0, i_datcollversion),
2970  fout);
2971  }
2972  }
2973 
2974  /*
2975  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
2976  * thing; the decision whether to specify a tablespace should be left till
2977  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
2978  * label the DATABASE entry with the tablespace and let the normal
2979  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
2980  * attention to default_tablespace, so that won't work.
2981  */
2982  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
2983  !dopt->outputNoTablespaces)
2984  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
2985  fmtId(tablespace));
2986  appendPQExpBufferStr(creaQry, ";\n");
2987 
2988  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
2989  qdatname);
2990 
2991  dbDumpId = createDumpId();
2992 
2993  ArchiveEntry(fout,
2994  dbCatId, /* catalog ID */
2995  dbDumpId, /* dump ID */
2996  ARCHIVE_OPTS(.tag = datname,
2997  .owner = dba,
2998  .description = "DATABASE",
2999  .section = SECTION_PRE_DATA,
3000  .createStmt = creaQry->data,
3001  .dropStmt = delQry->data));
3002 
3003  /* Compute correct tag for archive entry */
3004  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3005 
3006  /* Dump DB comment if any */
3007  {
3008  /*
3009  * 8.2 and up keep comments on shared objects in a shared table, so we
3010  * cannot use the dumpComment() code used for other database objects.
3011  * Be careful that the ArchiveEntry parameters match that function.
3012  */
3013  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3014 
3015  if (comment && *comment && !dopt->no_comments)
3016  {
3017  resetPQExpBuffer(dbQry);
3018 
3019  /*
3020  * Generates warning when loaded into a differently-named
3021  * database.
3022  */
3023  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3024  appendStringLiteralAH(dbQry, comment, fout);
3025  appendPQExpBufferStr(dbQry, ";\n");
3026 
3028  ARCHIVE_OPTS(.tag = labelq->data,
3029  .owner = dba,
3030  .description = "COMMENT",
3031  .section = SECTION_NONE,
3032  .createStmt = dbQry->data,
3033  .deps = &dbDumpId,
3034  .nDeps = 1));
3035  }
3036  }
3037 
3038  /* Dump DB security label, if enabled */
3039  if (!dopt->no_security_labels)
3040  {
3041  PGresult *shres;
3042  PQExpBuffer seclabelQry;
3043 
3044  seclabelQry = createPQExpBuffer();
3045 
3046  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3047  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3048  resetPQExpBuffer(seclabelQry);
3049  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3050  if (seclabelQry->len > 0)
3052  ARCHIVE_OPTS(.tag = labelq->data,
3053  .owner = dba,
3054  .description = "SECURITY LABEL",
3055  .section = SECTION_NONE,
3056  .createStmt = seclabelQry->data,
3057  .deps = &dbDumpId,
3058  .nDeps = 1));
3059  destroyPQExpBuffer(seclabelQry);
3060  PQclear(shres);
3061  }
3062 
3063  /*
3064  * Dump ACL if any. Note that we do not support initial privileges
3065  * (pg_init_privs) on databases.
3066  */
3067  dbdacl.privtype = 0;
3068  dbdacl.initprivs = NULL;
3069 
3070  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3071  qdatname, NULL, NULL,
3072  dba, &dbdacl);
3073 
3074  /*
3075  * Now construct a DATABASE PROPERTIES archive entry to restore any
3076  * non-default database-level properties. (The reason this must be
3077  * separate is that we cannot put any additional commands into the TOC
3078  * entry that has CREATE DATABASE. pg_restore would execute such a group
3079  * in an implicit transaction block, and the backend won't allow CREATE
3080  * DATABASE in that context.)
3081  */
3082  resetPQExpBuffer(creaQry);
3083  resetPQExpBuffer(delQry);
3084 
3085  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3086  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3087  qdatname, datconnlimit);
3088 
3089  if (strcmp(datistemplate, "t") == 0)
3090  {
3091  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3092  qdatname);
3093 
3094  /*
3095  * The backend won't accept DROP DATABASE on a template database. We
3096  * can deal with that by removing the template marking before the DROP
3097  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3098  * since no such command is currently supported, fake it with a direct
3099  * UPDATE on pg_database.
3100  */
3101  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3102  "SET datistemplate = false WHERE datname = ");
3103  appendStringLiteralAH(delQry, datname, fout);
3104  appendPQExpBufferStr(delQry, ";\n");
3105  }
3106 
3107  /* Add database-specific SET options */
3108  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3109 
3110  /*
3111  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3112  * entry, too, for lack of a better place.
3113  */
3114  if (dopt->binary_upgrade)
3115  {
3116  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3117  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3118  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3119  "WHERE datname = ",
3120  frozenxid, minmxid);
3121  appendStringLiteralAH(creaQry, datname, fout);
3122  appendPQExpBufferStr(creaQry, ";\n");
3123  }
3124 
3125  if (creaQry->len > 0)
3127  ARCHIVE_OPTS(.tag = datname,
3128  .owner = dba,
3129  .description = "DATABASE PROPERTIES",
3130  .section = SECTION_PRE_DATA,
3131  .createStmt = creaQry->data,
3132  .dropStmt = delQry->data,
3133  .deps = &dbDumpId));
3134 
3135  /*
3136  * pg_largeobject comes from the old system intact, so set its
3137  * relfrozenxids and relminmxids.
3138  */
3139  if (dopt->binary_upgrade)
3140  {
3141  PGresult *lo_res;
3142  PQExpBuffer loFrozenQry = createPQExpBuffer();
3143  PQExpBuffer loOutQry = createPQExpBuffer();
3144  int i_relfrozenxid,
3145  i_relminmxid;
3146 
3147  /*
3148  * pg_largeobject
3149  */
3150  if (fout->remoteVersion >= 90300)
3151  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid\n"
3152  "FROM pg_catalog.pg_class\n"
3153  "WHERE oid = %u;\n",
3154  LargeObjectRelationId);
3155  else
3156  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid\n"
3157  "FROM pg_catalog.pg_class\n"
3158  "WHERE oid = %u;\n",
3159  LargeObjectRelationId);
3160 
3161  lo_res = ExecuteSqlQueryForSingleRow(fout, loFrozenQry->data);
3162 
3163  i_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3164  i_relminmxid = PQfnumber(lo_res, "relminmxid");
3165 
3166  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3167  appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
3168  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3169  "WHERE oid = %u;\n",
3170  atooid(PQgetvalue(lo_res, 0, i_relfrozenxid)),
3171  atooid(PQgetvalue(lo_res, 0, i_relminmxid)),
3172  LargeObjectRelationId);
3174  ARCHIVE_OPTS(.tag = "pg_largeobject",
3175  .description = "pg_largeobject",
3176  .section = SECTION_PRE_DATA,
3177  .createStmt = loOutQry->data));
3178 
3179  PQclear(lo_res);
3180 
3181  destroyPQExpBuffer(loFrozenQry);
3182  destroyPQExpBuffer(loOutQry);
3183  }
3184 
3185  PQclear(res);
3186 
3187  free(qdatname);
3188  destroyPQExpBuffer(dbQry);
3189  destroyPQExpBuffer(delQry);
3190  destroyPQExpBuffer(creaQry);
3191  destroyPQExpBuffer(labelq);
3192 }
3193 
3194 /*
3195  * Collect any database-specific or role-and-database-specific SET options
3196  * for this database, and append them to outbuf.
3197  */
3198 static void
3200  const char *dbname, Oid dboid)
3201 {
3202  PGconn *conn = GetConnection(AH);
3204  PGresult *res;
3205 
3206  /* First collect database-specific options */
3207  printfPQExpBuffer(buf, "SELECT unnest(setconfig) FROM pg_db_role_setting "
3208  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3209  dboid);
3210 
3211  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3212 
3213  for (int i = 0; i < PQntuples(res); i++)
3215  "DATABASE", dbname, NULL, NULL,
3216  outbuf);
3217 
3218  PQclear(res);
3219 
3220  /* Now look for role-and-database-specific options */
3221  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3222  "FROM pg_db_role_setting s, pg_roles r "
3223  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3224  dboid);
3225 
3226  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3227 
3228  for (int i = 0; i < PQntuples(res); i++)
3230  "ROLE", PQgetvalue(res, i, 0),
3231  "DATABASE", dbname,
3232  outbuf);
3233 
3234  PQclear(res);
3235 
3237 }
3238 
3239 /*
3240  * dumpEncoding: put the correct encoding into the archive
3241  */
3242 static void
3244 {
3245  const char *encname = pg_encoding_to_char(AH->encoding);
3247 
3248  pg_log_info("saving encoding = %s", encname);
3249 
3250  appendPQExpBufferStr(qry, "SET client_encoding = ");
3251  appendStringLiteralAH(qry, encname, AH);
3252  appendPQExpBufferStr(qry, ";\n");
3253 
3255  ARCHIVE_OPTS(.tag = "ENCODING",
3256  .description = "ENCODING",
3257  .section = SECTION_PRE_DATA,
3258  .createStmt = qry->data));
3259 
3260  destroyPQExpBuffer(qry);
3261 }
3262 
3263 
3264 /*
3265  * dumpStdStrings: put the correct escape string behavior into the archive
3266  */
3267 static void
3269 {
3270  const char *stdstrings = AH->std_strings ? "on" : "off";
3272 
3273  pg_log_info("saving standard_conforming_strings = %s",
3274  stdstrings);
3275 
3276  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3277  stdstrings);
3278 
3280  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3281  .description = "STDSTRINGS",
3282  .section = SECTION_PRE_DATA,
3283  .createStmt = qry->data));
3284 
3285  destroyPQExpBuffer(qry);
3286 }
3287 
3288 /*
3289  * dumpSearchPath: record the active search_path in the archive
3290  */
3291 static void
3293 {
3295  PQExpBuffer path = createPQExpBuffer();
3296  PGresult *res;
3297  char **schemanames = NULL;
3298  int nschemanames = 0;
3299  int i;
3300 
3301  /*
3302  * We use the result of current_schemas(), not the search_path GUC,
3303  * because that might contain wildcards such as "$user", which won't
3304  * necessarily have the same value during restore. Also, this way avoids
3305  * listing schemas that may appear in search_path but not actually exist,
3306  * which seems like a prudent exclusion.
3307  */
3309  "SELECT pg_catalog.current_schemas(false)");
3310 
3311  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3312  pg_fatal("could not parse result of current_schemas()");
3313 
3314  /*
3315  * We use set_config(), not a simple "SET search_path" command, because
3316  * the latter has less-clean behavior if the search path is empty. While
3317  * that's likely to get fixed at some point, it seems like a good idea to
3318  * be as backwards-compatible as possible in what we put into archives.
3319  */
3320  for (i = 0; i < nschemanames; i++)
3321  {
3322  if (i > 0)
3323  appendPQExpBufferStr(path, ", ");
3324  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3325  }
3326 
3327  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3328  appendStringLiteralAH(qry, path->data, AH);
3329  appendPQExpBufferStr(qry, ", false);\n");
3330 
3331  pg_log_info("saving search_path = %s", path->data);
3332 
3334  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3335  .description = "SEARCHPATH",
3336  .section = SECTION_PRE_DATA,
3337  .createStmt = qry->data));
3338 
3339  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3340  AH->searchpath = pg_strdup(qry->data);
3341 
3342  if (schemanames)
3343  free(schemanames);
3344  PQclear(res);
3345  destroyPQExpBuffer(qry);
3346  destroyPQExpBuffer(path);
3347 }
3348 
3349 
3350 /*
3351  * getBlobs:
3352  * Collect schema-level data about large objects
3353  */
3354 static void
3356 {
3357  DumpOptions *dopt = fout->dopt;
3358  PQExpBuffer blobQry = createPQExpBuffer();
3359  BlobInfo *binfo;
3360  DumpableObject *bdata;
3361  PGresult *res;
3362  int ntups;
3363  int i;
3364  int i_oid;
3365  int i_lomowner;
3366  int i_lomacl;
3367  int i_acldefault;
3368 
3369  pg_log_info("reading large objects");
3370 
3371  /* Fetch BLOB OIDs, and owner/ACL data */
3372  appendPQExpBuffer(blobQry,
3373  "SELECT oid, lomowner, lomacl, "
3374  "acldefault('L', lomowner) AS acldefault "
3375  "FROM pg_largeobject_metadata");
3376 
3377  res = ExecuteSqlQuery(fout, blobQry->data, PGRES_TUPLES_OK);
3378 
3379  i_oid = PQfnumber(res, "oid");
3380  i_lomowner = PQfnumber(res, "lomowner");
3381  i_lomacl = PQfnumber(res, "lomacl");
3382  i_acldefault = PQfnumber(res, "acldefault");
3383 
3384  ntups = PQntuples(res);
3385 
3386  /*
3387  * Each large object has its own BLOB archive entry.
3388  */
3389  binfo = (BlobInfo *) pg_malloc(ntups * sizeof(BlobInfo));
3390 
3391  for (i = 0; i < ntups; i++)
3392  {
3393  binfo[i].dobj.objType = DO_BLOB;
3394  binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3395  binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3396  AssignDumpId(&binfo[i].dobj);
3397 
3398  binfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3399  binfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3400  binfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3401  binfo[i].dacl.privtype = 0;
3402  binfo[i].dacl.initprivs = NULL;
3403  binfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
3404 
3405  /* Blobs have data */
3407 
3408  /* Mark whether blob has an ACL */
3409  if (!PQgetisnull(res, i, i_lomacl))
3410  binfo[i].dobj.components |= DUMP_COMPONENT_ACL;
3411 
3412  /*
3413  * In binary-upgrade mode for blobs, we do *not* dump out the blob
3414  * data, as it will be copied by pg_upgrade, which simply copies the
3415  * pg_largeobject table. We *do* however dump out anything but the
3416  * data, as pg_upgrade copies just pg_largeobject, but not
3417  * pg_largeobject_metadata, after the dump is restored.
3418  */
3419  if (dopt->binary_upgrade)
3420  binfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3421  }
3422 
3423  /*
3424  * If we have any large objects, a "BLOBS" archive entry is needed. This
3425  * is just a placeholder for sorting; it carries no data now.
3426  */
3427  if (ntups > 0)
3428  {
3429  bdata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3430  bdata->objType = DO_BLOB_DATA;
3431  bdata->catId = nilCatalogId;
3432  AssignDumpId(bdata);
3433  bdata->name = pg_strdup("BLOBS");
3434  bdata->components |= DUMP_COMPONENT_DATA;
3435  }
3436 
3437  PQclear(res);
3438  destroyPQExpBuffer(blobQry);
3439 }
3440 
3441 /*
3442  * dumpBlob
3443  *
3444  * dump the definition (metadata) of the given large object
3445  */
3446 static void
3447 dumpBlob(Archive *fout, const BlobInfo *binfo)
3448 {
3449  PQExpBuffer cquery = createPQExpBuffer();
3450  PQExpBuffer dquery = createPQExpBuffer();
3451 
3452  appendPQExpBuffer(cquery,
3453  "SELECT pg_catalog.lo_create('%s');\n",
3454  binfo->dobj.name);
3455 
3456  appendPQExpBuffer(dquery,
3457  "SELECT pg_catalog.lo_unlink('%s');\n",
3458  binfo->dobj.name);
3459 
3460  if (binfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3461  ArchiveEntry(fout, binfo->dobj.catId, binfo->dobj.dumpId,
3462  ARCHIVE_OPTS(.tag = binfo->dobj.name,
3463  .owner = binfo->rolname,
3464  .description = "BLOB",
3465  .section = SECTION_PRE_DATA,
3466  .createStmt = cquery->data,
3467  .dropStmt = dquery->data));
3468 
3469  /* Dump comment if any */
3470  if (binfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3471  dumpComment(fout, "LARGE OBJECT", binfo->dobj.name,
3472  NULL, binfo->rolname,
3473  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3474 
3475  /* Dump security label if any */
3476  if (binfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3477  dumpSecLabel(fout, "LARGE OBJECT", binfo->dobj.name,
3478  NULL, binfo->rolname,
3479  binfo->dobj.catId, 0, binfo->dobj.dumpId);
3480 
3481  /* Dump ACL if any */
3482  if (binfo->dobj.dump & DUMP_COMPONENT_ACL)
3483  dumpACL(fout, binfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3484  binfo->dobj.name, NULL,
3485  NULL, binfo->rolname, &binfo->dacl);
3486 
3487  destroyPQExpBuffer(cquery);
3488  destroyPQExpBuffer(dquery);
3489 }
3490 
3491 /*
3492  * dumpBlobs:
3493  * dump the data contents of all large objects
3494  */
3495 static int
3496 dumpBlobs(Archive *fout, const void *arg)
3497 {
3498  const char *blobQry;
3499  const char *blobFetchQry;
3500  PGconn *conn = GetConnection(fout);
3501  PGresult *res;
3502  char buf[LOBBUFSIZE];
3503  int ntups;
3504  int i;
3505  int cnt;
3506 
3507  pg_log_info("saving large objects");
3508 
3509  /*
3510  * Currently, we re-fetch all BLOB OIDs using a cursor. Consider scanning
3511  * the already-in-memory dumpable objects instead...
3512  */
3513  blobQry =
3514  "DECLARE bloboid CURSOR FOR "
3515  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3516 
3517  ExecuteSqlStatement(fout, blobQry);
3518 
3519  /* Command to fetch from cursor */
3520  blobFetchQry = "FETCH 1000 IN bloboid";
3521 
3522  do
3523  {
3524  /* Do a fetch */
3525  res = ExecuteSqlQuery(fout, blobFetchQry, PGRES_TUPLES_OK);
3526 
3527  /* Process the tuples, if any */
3528  ntups = PQntuples(res);
3529  for (i = 0; i < ntups; i++)
3530  {
3531  Oid blobOid;
3532  int loFd;
3533 
3534  blobOid = atooid(PQgetvalue(res, i, 0));
3535  /* Open the BLOB */
3536  loFd = lo_open(conn, blobOid, INV_READ);
3537  if (loFd == -1)
3538  pg_fatal("could not open large object %u: %s",
3539  blobOid, PQerrorMessage(conn));
3540 
3541  StartBlob(fout, blobOid);
3542 
3543  /* Now read it in chunks, sending data to archive */
3544  do
3545  {
3546  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3547  if (cnt < 0)
3548  pg_fatal("error reading large object %u: %s",
3549  blobOid, PQerrorMessage(conn));
3550 
3551  WriteData(fout, buf, cnt);
3552  } while (cnt > 0);
3553 
3554  lo_close(conn, loFd);
3555 
3556  EndBlob(fout, blobOid);
3557  }
3558 
3559  PQclear(res);
3560  } while (ntups > 0);
3561 
3562  return 1;
3563 }
3564 
3565 /*
3566  * getPolicies
3567  * get information about all RLS policies on dumpable tables.
3568  */
3569 void
3570 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3571 {
3572  PQExpBuffer query;
3573  PQExpBuffer tbloids;
3574  PGresult *res;
3575  PolicyInfo *polinfo;
3576  int i_oid;
3577  int i_tableoid;
3578  int i_polrelid;
3579  int i_polname;
3580  int i_polcmd;
3581  int i_polpermissive;
3582  int i_polroles;
3583  int i_polqual;
3584  int i_polwithcheck;
3585  int i,
3586  j,
3587  ntups;
3588 
3589  /* No policies before 9.5 */
3590  if (fout->remoteVersion < 90500)
3591  return;
3592 
3593  query = createPQExpBuffer();
3594  tbloids = createPQExpBuffer();
3595 
3596  /*
3597  * Identify tables of interest, and check which ones have RLS enabled.
3598  */
3599  appendPQExpBufferChar(tbloids, '{');
3600  for (i = 0; i < numTables; i++)
3601  {
3602  TableInfo *tbinfo = &tblinfo[i];
3603 
3604  /* Ignore row security on tables not to be dumped */
3605  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3606  continue;
3607 
3608  /* It can't have RLS or policies if it's not a table */
3609  if (tbinfo->relkind != RELKIND_RELATION &&
3610  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
3611  continue;
3612 
3613  /* Add it to the list of table OIDs to be probed below */
3614  if (tbloids->len > 1) /* do we have more than the '{'? */
3615  appendPQExpBufferChar(tbloids, ',');
3616  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
3617 
3618  /* Is RLS enabled? (That's separate from whether it has policies) */
3619  if (tbinfo->rowsec)
3620  {
3622 
3623  /*
3624  * We represent RLS being enabled on a table by creating a
3625  * PolicyInfo object with null polname.
3626  *
3627  * Note: use tableoid 0 so that this object won't be mistaken for
3628  * something that pg_depend entries apply to.
3629  */
3630  polinfo = pg_malloc(sizeof(PolicyInfo));
3631  polinfo->dobj.objType = DO_POLICY;
3632  polinfo->dobj.catId.tableoid = 0;
3633  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3634  AssignDumpId(&polinfo->dobj);
3635  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3636  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3637  polinfo->poltable = tbinfo;
3638  polinfo->polname = NULL;
3639  polinfo->polcmd = '\0';
3640  polinfo->polpermissive = 0;
3641  polinfo->polroles = NULL;
3642  polinfo->polqual = NULL;
3643  polinfo->polwithcheck = NULL;
3644  }
3645  }
3646  appendPQExpBufferChar(tbloids, '}');
3647 
3648  /*
3649  * Now, read all RLS policies belonging to the tables of interest, and
3650  * create PolicyInfo objects for them. (Note that we must filter the
3651  * results server-side not locally, because we dare not apply pg_get_expr
3652  * to tables we don't have lock on.)
3653  */
3654  pg_log_info("reading row-level security policies");
3655 
3656  printfPQExpBuffer(query,
3657  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3658  if (fout->remoteVersion >= 100000)
3659  appendPQExpBuffer(query, "pol.polpermissive, ");
3660  else
3661  appendPQExpBuffer(query, "'t' as polpermissive, ");
3662  appendPQExpBuffer(query,
3663  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3664  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3665  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3666  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3667  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
3668  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
3669  tbloids->data);
3670 
3671  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3672 
3673  ntups = PQntuples(res);
3674  if (ntups > 0)
3675  {
3676  i_oid = PQfnumber(res, "oid");
3677  i_tableoid = PQfnumber(res, "tableoid");
3678  i_polrelid = PQfnumber(res, "polrelid");
3679  i_polname = PQfnumber(res, "polname");
3680  i_polcmd = PQfnumber(res, "polcmd");
3681  i_polpermissive = PQfnumber(res, "polpermissive");
3682  i_polroles = PQfnumber(res, "polroles");
3683  i_polqual = PQfnumber(res, "polqual");
3684  i_polwithcheck = PQfnumber(res, "polwithcheck");
3685 
3686  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3687 
3688  for (j = 0; j < ntups; j++)
3689  {
3690  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3691  TableInfo *tbinfo = findTableByOid(polrelid);
3692 
3694 
3695  polinfo[j].dobj.objType = DO_POLICY;
3696  polinfo[j].dobj.catId.tableoid =
3697  atooid(PQgetvalue(res, j, i_tableoid));
3698  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3699  AssignDumpId(&polinfo[j].dobj);
3700  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3701  polinfo[j].poltable = tbinfo;
3702  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3703  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3704 
3705  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3706  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3707 
3708  if (PQgetisnull(res, j, i_polroles))
3709  polinfo[j].polroles = NULL;
3710  else
3711  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3712 
3713  if (PQgetisnull(res, j, i_polqual))
3714  polinfo[j].polqual = NULL;
3715  else
3716  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3717 
3718  if (PQgetisnull(res, j, i_polwithcheck))
3719  polinfo[j].polwithcheck = NULL;
3720  else
3721  polinfo[j].polwithcheck
3722  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3723  }
3724  }
3725 
3726  PQclear(res);
3727 
3728  destroyPQExpBuffer(query);
3729  destroyPQExpBuffer(tbloids);
3730 }
3731 
3732 /*
3733  * dumpPolicy
3734  * dump the definition of the given policy
3735  */
3736 static void
3737 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3738 {
3739  DumpOptions *dopt = fout->dopt;
3740  TableInfo *tbinfo = polinfo->poltable;
3741  PQExpBuffer query;
3742  PQExpBuffer delqry;
3743  PQExpBuffer polprefix;
3744  char *qtabname;
3745  const char *cmd;
3746  char *tag;
3747 
3748  /* Do nothing in data-only dump */
3749  if (dopt->dataOnly)
3750  return;
3751 
3752  /*
3753  * If polname is NULL, then this record is just indicating that ROW LEVEL
3754  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3755  * ROW LEVEL SECURITY.
3756  */
3757  if (polinfo->polname == NULL)
3758  {
3759  query = createPQExpBuffer();
3760 
3761  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3762  fmtQualifiedDumpable(tbinfo));
3763 
3764  /*
3765  * We must emit the ROW SECURITY object's dependency on its table
3766  * explicitly, because it will not match anything in pg_depend (unlike
3767  * the case for other PolicyInfo objects).
3768  */
3769  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3770  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3771  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3772  .namespace = polinfo->dobj.namespace->dobj.name,
3773  .owner = tbinfo->rolname,
3774  .description = "ROW SECURITY",
3775  .section = SECTION_POST_DATA,
3776  .createStmt = query->data,
3777  .deps = &(tbinfo->dobj.dumpId),
3778  .nDeps = 1));
3779 
3780  destroyPQExpBuffer(query);
3781  return;
3782  }
3783 
3784  if (polinfo->polcmd == '*')
3785  cmd = "";
3786  else if (polinfo->polcmd == 'r')
3787  cmd = " FOR SELECT";
3788  else if (polinfo->polcmd == 'a')
3789  cmd = " FOR INSERT";
3790  else if (polinfo->polcmd == 'w')
3791  cmd = " FOR UPDATE";
3792  else if (polinfo->polcmd == 'd')
3793  cmd = " FOR DELETE";
3794  else
3795  pg_fatal("unexpected policy command type: %c",
3796  polinfo->polcmd);
3797 
3798  query = createPQExpBuffer();
3799  delqry = createPQExpBuffer();
3800  polprefix = createPQExpBuffer();
3801 
3802  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3803 
3804  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
3805 
3806  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
3807  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
3808 
3809  if (polinfo->polroles != NULL)
3810  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
3811 
3812  if (polinfo->polqual != NULL)
3813  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
3814 
3815  if (polinfo->polwithcheck != NULL)
3816  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
3817 
3818  appendPQExpBufferStr(query, ";\n");
3819 
3820  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
3821  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
3822 
3823  appendPQExpBuffer(polprefix, "POLICY %s ON",
3824  fmtId(polinfo->polname));
3825 
3826  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
3827 
3828  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3829  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3830  ARCHIVE_OPTS(.tag = tag,
3831  .namespace = polinfo->dobj.namespace->dobj.name,
3832  .owner = tbinfo->rolname,
3833  .description = "POLICY",
3834  .section = SECTION_POST_DATA,
3835  .createStmt = query->data,
3836  .dropStmt = delqry->data));
3837 
3838  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3839  dumpComment(fout, polprefix->data, qtabname,
3840  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
3841  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
3842 
3843  free(tag);
3844  destroyPQExpBuffer(query);
3845  destroyPQExpBuffer(delqry);
3846  destroyPQExpBuffer(polprefix);
3847  free(qtabname);
3848 }
3849 
3850 /*
3851  * getPublications
3852  * get information about publications
3853  */
3855 getPublications(Archive *fout, int *numPublications)
3856 {
3857  DumpOptions *dopt = fout->dopt;
3858  PQExpBuffer query;
3859  PGresult *res;
3860  PublicationInfo *pubinfo;
3861  int i_tableoid;
3862  int i_oid;
3863  int i_pubname;
3864  int i_pubowner;
3865  int i_puballtables;
3866  int i_pubinsert;
3867  int i_pubupdate;
3868  int i_pubdelete;
3869  int i_pubtruncate;
3870  int i_pubviaroot;
3871  int i,
3872  ntups;
3873 
3874  if (dopt->no_publications || fout->remoteVersion < 100000)
3875  {
3876  *numPublications = 0;
3877  return NULL;
3878  }
3879 
3880  query = createPQExpBuffer();
3881 
3882  resetPQExpBuffer(query);
3883 
3884  /* Get the publications. */
3885  if (fout->remoteVersion >= 130000)
3886  appendPQExpBuffer(query,
3887  "SELECT p.tableoid, p.oid, p.pubname, "
3888  "p.pubowner, "
3889  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
3890  "FROM pg_publication p");
3891  else if (fout->remoteVersion >= 110000)
3892  appendPQExpBuffer(query,
3893  "SELECT p.tableoid, p.oid, p.pubname, "
3894  "p.pubowner, "
3895  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
3896  "FROM pg_publication p");
3897  else
3898  appendPQExpBuffer(query,
3899  "SELECT p.tableoid, p.oid, p.pubname, "
3900  "p.pubowner, "
3901  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
3902  "FROM pg_publication p");
3903 
3904  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3905 
3906  ntups = PQntuples(res);
3907 
3908  i_tableoid = PQfnumber(res, "tableoid");
3909  i_oid = PQfnumber(res, "oid");
3910  i_pubname = PQfnumber(res, "pubname");
3911  i_pubowner = PQfnumber(res, "pubowner");
3912  i_puballtables = PQfnumber(res, "puballtables");
3913  i_pubinsert = PQfnumber(res, "pubinsert");
3914  i_pubupdate = PQfnumber(res, "pubupdate");
3915  i_pubdelete = PQfnumber(res, "pubdelete");
3916  i_pubtruncate = PQfnumber(res, "pubtruncate");
3917  i_pubviaroot = PQfnumber(res, "pubviaroot");
3918 
3919  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
3920 
3921  for (i = 0; i < ntups; i++)
3922  {
3923  pubinfo[i].dobj.objType = DO_PUBLICATION;
3924  pubinfo[i].dobj.catId.tableoid =
3925  atooid(PQgetvalue(res, i, i_tableoid));
3926  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3927  AssignDumpId(&pubinfo[i].dobj);
3928  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
3929  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
3930  pubinfo[i].puballtables =
3931  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
3932  pubinfo[i].pubinsert =
3933  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
3934  pubinfo[i].pubupdate =
3935  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
3936  pubinfo[i].pubdelete =
3937  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
3938  pubinfo[i].pubtruncate =
3939  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
3940  pubinfo[i].pubviaroot =
3941  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
3942 
3943  /* Decide whether we want to dump it */
3944  selectDumpableObject(&(pubinfo[i].dobj), fout);
3945  }
3946  PQclear(res);
3947 
3948  destroyPQExpBuffer(query);
3949 
3950  *numPublications = ntups;
3951  return pubinfo;
3952 }
3953 
3954 /*
3955  * dumpPublication
3956  * dump the definition of the given publication
3957  */
3958 static void
3960 {
3961  DumpOptions *dopt = fout->dopt;
3962  PQExpBuffer delq;
3963  PQExpBuffer query;
3964  char *qpubname;
3965  bool first = true;
3966 
3967  /* Do nothing in data-only dump */
3968  if (dopt->dataOnly)
3969  return;
3970 
3971  delq = createPQExpBuffer();
3972  query = createPQExpBuffer();
3973 
3974  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
3975 
3976  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
3977  qpubname);
3978 
3979  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
3980  qpubname);
3981 
3982  if (pubinfo->puballtables)
3983  appendPQExpBufferStr(query, " FOR ALL TABLES");
3984 
3985  appendPQExpBufferStr(query, " WITH (publish = '");
3986  if (pubinfo->pubinsert)
3987  {
3988  appendPQExpBufferStr(query, "insert");
3989  first = false;
3990  }
3991 
3992  if (pubinfo->pubupdate)
3993  {
3994  if (!first)
3995  appendPQExpBufferStr(query, ", ");
3996 
3997  appendPQExpBufferStr(query, "update");
3998  first = false;
3999  }
4000 
4001  if (pubinfo->pubdelete)
4002  {
4003  if (!first)
4004  appendPQExpBufferStr(query, ", ");
4005 
4006  appendPQExpBufferStr(query, "delete");
4007  first = false;
4008  }
4009 
4010  if (pubinfo->pubtruncate)
4011  {
4012  if (!first)
4013  appendPQExpBufferStr(query, ", ");
4014 
4015  appendPQExpBufferStr(query, "truncate");
4016  first = false;
4017  }
4018 
4019  appendPQExpBufferStr(query, "'");
4020 
4021  if (pubinfo->pubviaroot)
4022  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4023 
4024  appendPQExpBufferStr(query, ");\n");
4025 
4026  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4027  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4028  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4029  .owner = pubinfo->rolname,
4030  .description = "PUBLICATION",
4031  .section = SECTION_POST_DATA,
4032  .createStmt = query->data,
4033  .dropStmt = delq->data));
4034 
4035  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4036  dumpComment(fout, "PUBLICATION", qpubname,
4037  NULL, pubinfo->rolname,
4038  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4039 
4040  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4041  dumpSecLabel(fout, "PUBLICATION", qpubname,
4042  NULL, pubinfo->rolname,
4043  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4044 
4045  destroyPQExpBuffer(delq);
4046  destroyPQExpBuffer(query);
4047  free(qpubname);
4048 }
4049 
4050 /*
4051  * getPublicationNamespaces
4052  * get information about publication membership for dumpable schemas.
4053  */
4054 void
4056 {
4057  PQExpBuffer query;
4058  PGresult *res;
4059  PublicationSchemaInfo *pubsinfo;
4060  DumpOptions *dopt = fout->dopt;
4061  int i_tableoid;
4062  int i_oid;
4063  int i_pnpubid;
4064  int i_pnnspid;
4065  int i,
4066  j,
4067  ntups;
4068 
4069  if (dopt->no_publications || fout->remoteVersion < 150000)
4070  return;
4071 
4072  query = createPQExpBuffer();
4073 
4074  /* Collect all publication membership info. */
4075  appendPQExpBufferStr(query,
4076  "SELECT tableoid, oid, pnpubid, pnnspid "
4077  "FROM pg_catalog.pg_publication_namespace");
4078  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4079 
4080  ntups = PQntuples(res);
4081 
4082  i_tableoid = PQfnumber(res, "tableoid");
4083  i_oid = PQfnumber(res, "oid");
4084  i_pnpubid = PQfnumber(res, "pnpubid");
4085  i_pnnspid = PQfnumber(res, "pnnspid");
4086 
4087  /* this allocation may be more than we need */
4088  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4089  j = 0;
4090 
4091  for (i = 0; i < ntups; i++)
4092  {
4093  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4094  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4095  PublicationInfo *pubinfo;
4096  NamespaceInfo *nspinfo;
4097 
4098  /*
4099  * Ignore any entries for which we aren't interested in either the
4100  * publication or the rel.
4101  */
4102  pubinfo = findPublicationByOid(pnpubid);
4103  if (pubinfo == NULL)
4104  continue;
4105  nspinfo = findNamespaceByOid(pnnspid);
4106  if (nspinfo == NULL)
4107  continue;
4108 
4109  /*
4110  * We always dump publication namespaces unless the corresponding
4111  * namespace is excluded from the dump.
4112  */
4113  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4114  continue;
4115 
4116  /* OK, make a DumpableObject for this relationship */
4118  pubsinfo[j].dobj.catId.tableoid =
4119  atooid(PQgetvalue(res, i, i_tableoid));
4120  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4121  AssignDumpId(&pubsinfo[j].dobj);
4122  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4123  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4124  pubsinfo[j].publication = pubinfo;
4125  pubsinfo[j].pubschema = nspinfo;
4126 
4127  /* Decide whether we want to dump it */
4128  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4129 
4130  j++;
4131  }
4132 
4133  PQclear(res);
4134  destroyPQExpBuffer(query);
4135 }
4136 
4137 /*
4138  * getPublicationTables
4139  * get information about publication membership for dumpable tables.
4140  */
4141 void
4142 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4143 {
4144  PQExpBuffer query;
4145  PGresult *res;
4146  PublicationRelInfo *pubrinfo;
4147  DumpOptions *dopt = fout->dopt;
4148  int i_tableoid;
4149  int i_oid;
4150  int i_prpubid;
4151  int i_prrelid;
4152  int i_prrelqual;
4153  int i_prattrs;
4154  int i,
4155  j,
4156  ntups;
4157 
4158  if (dopt->no_publications || fout->remoteVersion < 100000)
4159  return;
4160 
4161  query = createPQExpBuffer();
4162 
4163  /* Collect all publication membership info. */
4164  if (fout->remoteVersion >= 150000)
4165  appendPQExpBufferStr(query,
4166  "SELECT tableoid, oid, prpubid, prrelid, "
4167  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4168  "(CASE\n"
4169  " WHEN pr.prattrs IS NOT NULL THEN\n"
4170  " (SELECT array_agg(attname)\n"
4171  " FROM\n"
4172  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4173  " pg_catalog.pg_attribute\n"
4174  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4175  " ELSE NULL END) prattrs "
4176  "FROM pg_catalog.pg_publication_rel pr");
4177  else
4178  appendPQExpBufferStr(query,
4179  "SELECT tableoid, oid, prpubid, prrelid, "
4180  "NULL AS prrelqual, NULL AS prattrs "
4181  "FROM pg_catalog.pg_publication_rel");
4182  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4183 
4184  ntups = PQntuples(res);
4185 
4186  i_tableoid = PQfnumber(res, "tableoid");
4187  i_oid = PQfnumber(res, "oid");
4188  i_prpubid = PQfnumber(res, "prpubid");
4189  i_prrelid = PQfnumber(res, "prrelid");
4190  i_prrelqual = PQfnumber(res, "prrelqual");
4191  i_prattrs = PQfnumber(res, "prattrs");
4192 
4193  /* this allocation may be more than we need */
4194  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4195  j = 0;
4196 
4197  for (i = 0; i < ntups; i++)
4198  {
4199  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4200  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4201  PublicationInfo *pubinfo;
4202  TableInfo *tbinfo;
4203 
4204  /*
4205  * Ignore any entries for which we aren't interested in either the
4206  * publication or the rel.
4207  */
4208  pubinfo = findPublicationByOid(prpubid);
4209  if (pubinfo == NULL)
4210  continue;
4211  tbinfo = findTableByOid(prrelid);
4212  if (tbinfo == NULL)
4213  continue;
4214 
4215  /*
4216  * Ignore publication membership of tables whose definitions are not
4217  * to be dumped.
4218  */
4219  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4220  continue;
4221 
4222  /* OK, make a DumpableObject for this relationship */
4223  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4224  pubrinfo[j].dobj.catId.tableoid =
4225  atooid(PQgetvalue(res, i, i_tableoid));
4226  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4227  AssignDumpId(&pubrinfo[j].dobj);
4228  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4229  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4230  pubrinfo[j].publication = pubinfo;
4231  pubrinfo[j].pubtable = tbinfo;
4232  if (PQgetisnull(res, i, i_prrelqual))
4233  pubrinfo[j].pubrelqual = NULL;
4234  else
4235  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4236 
4237  if (!PQgetisnull(res, i, i_prattrs))
4238  {
4239  char **attnames;
4240  int nattnames;
4241  PQExpBuffer attribs;
4242 
4243  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4244  &attnames, &nattnames))
4245  pg_fatal("could not parse %s array", "prattrs");
4246  attribs = createPQExpBuffer();
4247  for (int k = 0; k < nattnames; k++)
4248  {
4249  if (k > 0)
4250  appendPQExpBufferStr(attribs, ", ");
4251 
4252  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4253  }
4254  pubrinfo[j].pubrattrs = attribs->data;
4255  }
4256  else
4257  pubrinfo[j].pubrattrs = NULL;
4258 
4259  /* Decide whether we want to dump it */
4260  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4261 
4262  j++;
4263  }
4264 
4265  PQclear(res);
4266  destroyPQExpBuffer(query);
4267 }
4268 
4269 /*
4270  * dumpPublicationNamespace
4271  * dump the definition of the given publication schema mapping.
4272  */
4273 static void
4275 {
4276  DumpOptions *dopt = fout->dopt;
4277  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4278  PublicationInfo *pubinfo = pubsinfo->publication;
4279  PQExpBuffer query;
4280  char *tag;
4281 
4282  /* Do nothing in data-only dump */
4283  if (dopt->dataOnly)
4284  return;
4285 
4286  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4287 
4288  query = createPQExpBuffer();
4289 
4290  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4291  appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4292 
4293  /*
4294  * There is no point in creating drop query as the drop is done by schema
4295  * drop.
4296  */
4297  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4298  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4299  ARCHIVE_OPTS(.tag = tag,
4300  .namespace = schemainfo->dobj.name,
4301  .owner = pubinfo->rolname,
4302  .description = "PUBLICATION TABLES IN SCHEMA",
4303  .section = SECTION_POST_DATA,
4304  .createStmt = query->data));
4305 
4306  /* These objects can't currently have comments or seclabels */
4307 
4308  free(tag);
4309  destroyPQExpBuffer(query);
4310 }
4311 
4312 /*
4313  * dumpPublicationTable
4314  * dump the definition of the given publication table mapping
4315  */
4316 static void
4318 {
4319  DumpOptions *dopt = fout->dopt;
4320  PublicationInfo *pubinfo = pubrinfo->publication;
4321  TableInfo *tbinfo = pubrinfo->pubtable;
4322  PQExpBuffer query;
4323  char *tag;
4324 
4325  /* Do nothing in data-only dump */
4326  if (dopt->dataOnly)
4327  return;
4328 
4329  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4330 
4331  query = createPQExpBuffer();
4332 
4333  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4334  fmtId(pubinfo->dobj.name));
4335  appendPQExpBuffer(query, " %s",
4336  fmtQualifiedDumpable(tbinfo));
4337 
4338  if (pubrinfo->pubrattrs)
4339  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4340 
4341  if (pubrinfo->pubrelqual)
4342  {
4343  /*
4344  * It's necessary to add parentheses around the expression because
4345  * pg_get_expr won't supply the parentheses for things like WHERE
4346  * TRUE.
4347  */
4348  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4349  }
4350  appendPQExpBufferStr(query, ";\n");
4351 
4352  /*
4353  * There is no point in creating a drop query as the drop is done by table
4354  * drop. (If you think to change this, see also _printTocEntry().)
4355  * Although this object doesn't really have ownership as such, set the
4356  * owner field anyway to ensure that the command is run by the correct
4357  * role at restore time.
4358  */
4359  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4360  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4361  ARCHIVE_OPTS(.tag = tag,
4362  .namespace = tbinfo->dobj.namespace->dobj.name,
4363  .owner = pubinfo->rolname,
4364  .description = "PUBLICATION TABLE",
4365  .section = SECTION_POST_DATA,
4366  .createStmt = query->data));
4367 
4368  /* These objects can't currently have comments or seclabels */
4369 
4370  free(tag);
4371  destroyPQExpBuffer(query);
4372 }
4373 
4374 /*
4375  * Is the currently connected user a superuser?
4376  */
4377 static bool
4379 {
4380  ArchiveHandle *AH = (ArchiveHandle *) fout;
4381  const char *val;
4382 
4383  val = PQparameterStatus(AH->connection, "is_superuser");
4384 
4385  if (val && strcmp(val, "on") == 0)
4386  return true;
4387 
4388  return false;
4389 }
4390 
4391 /*
4392  * getSubscriptions
4393  * get information about subscriptions
4394  */
4395 void
4397 {
4398  DumpOptions *dopt = fout->dopt;
4399  PQExpBuffer query;
4400  PGresult *res;
4401  SubscriptionInfo *subinfo;
4402  int i_tableoid;
4403  int i_oid;
4404  int i_subname;
4405  int i_subowner;
4406  int i_substream;
4407  int i_subtwophasestate;
4408  int i_subdisableonerr;
4409  int i_subconninfo;
4410  int i_subslotname;
4411  int i_subsynccommit;
4412  int i_subpublications;
4413  int i_subbinary;
4414  int i,
4415  ntups;
4416 
4417  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4418  return;
4419 
4420  if (!is_superuser(fout))
4421  {
4422  int n;
4423 
4424  res = ExecuteSqlQuery(fout,
4425  "SELECT count(*) FROM pg_subscription "
4426  "WHERE subdbid = (SELECT oid FROM pg_database"
4427  " WHERE datname = current_database())",
4428  PGRES_TUPLES_OK);
4429  n = atoi(PQgetvalue(res, 0, 0));
4430  if (n > 0)
4431  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4432  PQclear(res);
4433  return;
4434  }
4435 
4436  query = createPQExpBuffer();
4437 
4438  /* Get the subscriptions in current database. */
4439  appendPQExpBuffer(query,
4440  "SELECT s.tableoid, s.oid, s.subname,\n"
4441  " s.subowner,\n"
4442  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4443  " s.subpublications,\n");
4444 
4445  if (fout->remoteVersion >= 140000)
4446  appendPQExpBufferStr(query, " s.subbinary,\n");
4447  else
4448  appendPQExpBufferStr(query, " false AS subbinary,\n");
4449 
4450  if (fout->remoteVersion >= 140000)
4451  appendPQExpBufferStr(query, " s.substream,\n");
4452  else
4453  appendPQExpBufferStr(query, " false AS substream,\n");
4454 
4455  if (fout->remoteVersion >= 150000)
4456  appendPQExpBufferStr(query,
4457  " s.subtwophasestate,\n"
4458  " s.subdisableonerr\n");
4459  else
4460  appendPQExpBuffer(query,
4461  " '%c' AS subtwophasestate,\n"
4462  " false AS subdisableonerr\n",
4464 
4465  appendPQExpBufferStr(query,
4466  "FROM pg_subscription s\n"
4467  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4468  " WHERE datname = current_database())");
4469 
4470  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4471 
4472  ntups = PQntuples(res);
4473 
4474  /*
4475  * Get subscription fields. We don't include subskiplsn in the dump as
4476  * after restoring the dump this value may no longer be relevant.
4477  */
4478  i_tableoid = PQfnumber(res, "tableoid");
4479  i_oid = PQfnumber(res, "oid");
4480  i_subname = PQfnumber(res, "subname");
4481  i_subowner = PQfnumber(res, "subowner");
4482  i_subconninfo = PQfnumber(res, "subconninfo");
4483  i_subslotname = PQfnumber(res, "subslotname");
4484  i_subsynccommit = PQfnumber(res, "subsynccommit");
4485  i_subpublications = PQfnumber(res, "subpublications");
4486  i_subbinary = PQfnumber(res, "subbinary");
4487  i_substream = PQfnumber(res, "substream");
4488  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4489  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4490 
4491  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4492 
4493  for (i = 0; i < ntups; i++)
4494  {
4495  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4496  subinfo[i].dobj.catId.tableoid =
4497  atooid(PQgetvalue(res, i, i_tableoid));
4498  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4499  AssignDumpId(&subinfo[i].dobj);
4500  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4501  subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
4502  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4503  if (PQgetisnull(res, i, i_subslotname))
4504  subinfo[i].subslotname = NULL;
4505  else
4506  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4507  subinfo[i].subsynccommit =
4508  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4509  subinfo[i].subpublications =
4510  pg_strdup(PQgetvalue(res, i, i_subpublications));
4511  subinfo[i].subbinary =
4512  pg_strdup(PQgetvalue(res, i, i_subbinary));
4513  subinfo[i].substream =
4514  pg_strdup(PQgetvalue(res, i, i_substream));
4515  subinfo[i].subtwophasestate =
4516  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4517  subinfo[i].subdisableonerr =
4518  pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
4519 
4520  /* Decide whether we want to dump it */
4521  selectDumpableObject(&(subinfo[i].dobj), fout);
4522  }
4523  PQclear(res);
4524 
4525  destroyPQExpBuffer(query);
4526 }
4527 
4528 /*
4529  * dumpSubscription
4530  * dump the definition of the given subscription
4531  */
4532 static void
4534 {
4535  DumpOptions *dopt = fout->dopt;
4536  PQExpBuffer delq;
4537  PQExpBuffer query;
4538  PQExpBuffer publications;
4539  char *qsubname;
4540  char **pubnames = NULL;
4541  int npubnames = 0;
4542  int i;
4543  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4544 
4545  /* Do nothing in data-only dump */
4546  if (dopt->dataOnly)
4547  return;
4548 
4549  delq = createPQExpBuffer();
4550  query = createPQExpBuffer();
4551 
4552  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4553 
4554  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4555  qsubname);
4556 
4557  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4558  qsubname);
4559  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4560 
4561  /* Build list of quoted publications and append them to query. */
4562  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4563  pg_fatal("could not parse %s array", "subpublications");
4564 
4565  publications = createPQExpBuffer();
4566  for (i = 0; i < npubnames; i++)
4567  {
4568  if (i > 0)
4569  appendPQExpBufferStr(publications, ", ");
4570 
4571  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4572  }
4573 
4574  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4575  if (subinfo->subslotname)
4576  appendStringLiteralAH(query, subinfo->subslotname, fout);
4577  else
4578  appendPQExpBufferStr(query, "NONE");
4579 
4580  if (strcmp(subinfo->subbinary, "t") == 0)
4581  appendPQExpBufferStr(query, ", binary = true");
4582 
4583  if (strcmp(subinfo->substream, "f") != 0)
4584  appendPQExpBufferStr(query, ", streaming = on");
4585 
4586  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4587  appendPQExpBufferStr(query, ", two_phase = on");
4588 
4589  if (strcmp(subinfo->subdisableonerr, "t") == 0)
4590  appendPQExpBufferStr(query, ", disable_on_error = true");
4591 
4592  if (strcmp(subinfo->subsynccommit, "off") != 0)
4593  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4594 
4595  appendPQExpBufferStr(query, ");\n");
4596 
4597  if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4598  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4599  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4600  .owner = subinfo->rolname,
4601  .description = "SUBSCRIPTION",
4602  .section = SECTION_POST_DATA,
4603  .createStmt = query->data,
4604  .dropStmt = delq->data));
4605 
4606  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4607  dumpComment(fout, "SUBSCRIPTION", qsubname,
4608  NULL, subinfo->rolname,
4609  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4610 
4611  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4612  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4613  NULL, subinfo->rolname,
4614  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4615 
4616  destroyPQExpBuffer(publications);
4617  if (pubnames)
4618  free(pubnames);
4619 
4620  destroyPQExpBuffer(delq);
4621  destroyPQExpBuffer(query);
4622  free(qsubname);
4623 }
4624 
4625 /*
4626  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4627  * the object needs.
4628  */
4629 static void
4631  PQExpBuffer create,
4632  const DumpableObject *dobj,
4633  const char *catalog,
4634  const char *keyword,
4635  const char *objname)
4636 {
4637  if (dobj->depends_on_ext)
4638  {
4639  char *nm;
4640  PGresult *res;
4641  PQExpBuffer query;
4642  int ntups;
4643  int i_extname;
4644  int i;
4645 
4646  /* dodge fmtId() non-reentrancy */
4647  nm = pg_strdup(objname);
4648 
4649  query = createPQExpBuffer();
4650  appendPQExpBuffer(query,
4651  "SELECT e.extname "
4652  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4653  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4654  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4655  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4656  catalog,
4657  dobj->catId.oid);
4658  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4659  ntups = PQntuples(res);
4660  i_extname = PQfnumber(res, "extname");
4661  for (i = 0; i < ntups; i++)
4662  {
4663  appendPQExpBuffer(create, "ALTER %s %s DEPENDS ON EXTENSION %s;\n",
4664  keyword, nm,
4665  fmtId(PQgetvalue(res, i, i_extname)));
4666  }
4667 
4668  PQclear(res);
4669  destroyPQExpBuffer(query);
4670  pg_free(nm);
4671  }
4672 }
4673 
4674 static Oid
4676 {
4677  /*
4678  * If the old version didn't assign an array type, but the new version
4679  * does, we must select an unused type OID to assign. This currently only
4680  * happens for domains, when upgrading pre-v11 to v11 and up.
4681  *
4682  * Note: local state here is kind of ugly, but we must have some, since we
4683  * mustn't choose the same unused OID more than once.
4684  */
4685  static Oid next_possible_free_oid = FirstNormalObjectId;
4686  PGresult *res;
4687  bool is_dup;
4688 
4689  do
4690  {
4691  ++next_possible_free_oid;
4692  printfPQExpBuffer(upgrade_query,
4693  "SELECT EXISTS(SELECT 1 "
4694  "FROM pg_catalog.pg_type "
4695  "WHERE oid = '%u'::pg_catalog.oid);",
4696  next_possible_free_oid);
4697  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4698  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4699  PQclear(res);
4700  } while (is_dup);
4701 
4702  return next_possible_free_oid;
4703 }
4704 
4705 static void
4707  PQExpBuffer upgrade_buffer,
4708  Oid pg_type_oid,
4709  bool force_array_type,
4710  bool include_multirange_type)
4711 {
4712  PQExpBuffer upgrade_query = createPQExpBuffer();
4713  PGresult *res;
4714  Oid pg_type_array_oid;
4715  Oid pg_type_multirange_oid;
4716  Oid pg_type_multirange_array_oid;
4717 
4718  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4719  appendPQExpBuffer(upgrade_buffer,
4720  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4721  pg_type_oid);
4722 
4723  appendPQExpBuffer(upgrade_query,
4724  "SELECT typarray "
4725  "FROM pg_catalog.pg_type "
4726  "WHERE oid = '%u'::pg_catalog.oid;",
4727  pg_type_oid);
4728 
4729  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4730 
4731  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4732 
4733  PQclear(res);
4734 
4735  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4736  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4737 
4738  if (OidIsValid(pg_type_array_oid))
4739  {
4740  appendPQExpBufferStr(upgrade_buffer,
4741  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4742  appendPQExpBuffer(upgrade_buffer,
4743  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4744  pg_type_array_oid);
4745  }
4746 
4747  /*
4748  * Pre-set the multirange type oid and its own array type oid.
4749  */
4750  if (include_multirange_type)
4751  {
4752  if (fout->remoteVersion >= 140000)
4753  {
4754  printfPQExpBuffer(upgrade_query,
4755  "SELECT t.oid, t.typarray "
4756  "FROM pg_catalog.pg_type t "
4757  "JOIN pg_catalog.pg_range r "
4758  "ON t.oid = r.rngmultitypid "
4759  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4760  pg_type_oid);
4761 
4762  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4763 
4764  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4765  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4766 
4767  PQclear(res);
4768  }
4769  else
4770  {
4771  pg_type_multirange_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4772  pg_type_multirange_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4773  }
4774 
4775  appendPQExpBufferStr(upgrade_buffer,
4776  "\n-- For binary upgrade, must preserve multirange pg_type oid\n");
4777  appendPQExpBuffer(upgrade_buffer,
4778  "SELECT pg_catalog.binary_upgrade_set_next_multirange_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4779  pg_type_multirange_oid);
4780  appendPQExpBufferStr(upgrade_buffer,
4781  "\n-- For binary upgrade, must preserve multirange pg_type array oid\n");
4782  appendPQExpBuffer(upgrade_buffer,
4783  "SELECT pg_catalog.binary_upgrade_set_next_multirange_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4784  pg_type_multirange_array_oid);
4785  }
4786 
4787  destroyPQExpBuffer(upgrade_query);
4788 }
4789 
4790 static void
4792  PQExpBuffer upgrade_buffer,
4793  const TableInfo *tbinfo)
4794 {
4795  Oid pg_type_oid = tbinfo->reltype;
4796 
4797  if (OidIsValid(pg_type_oid))
4798  binary_upgrade_set_type_oids_by_type_oid(fout, upgrade_buffer,
4799  pg_type_oid, false, false);
4800 }
4801 
4802 static void
4804  PQExpBuffer upgrade_buffer, Oid pg_class_oid,
4805  bool is_index)
4806 {
4807  PQExpBuffer upgrade_query = createPQExpBuffer();
4808  PGresult *upgrade_res;
4809  Oid relfilenode;
4810  Oid toast_oid;
4811  Oid toast_relfilenode;
4812  char relkind;
4813  Oid toast_index_oid;
4814  Oid toast_index_relfilenode;
4815 
4816  /*
4817  * Preserve the OID and relfilenode of the table, table's index, table's
4818  * toast table and toast table's index if any.
4819  *
4820  * One complexity is that the current table definition might not require
4821  * the creation of a TOAST table, but the old database might have a TOAST
4822  * table that was created earlier, before some wide columns were dropped.
4823  * By setting the TOAST oid we force creation of the TOAST heap and index
4824  * by the new backend, so we can copy the files during binary upgrade
4825  * without worrying about this case.
4826  */
4827  appendPQExpBuffer(upgrade_query,
4828  "SELECT c.relkind, c.relfilenode, c.reltoastrelid, ct.relfilenode AS toast_relfilenode, i.indexrelid, cti.relfilenode AS toast_index_relfilenode "
4829  "FROM pg_catalog.pg_class c LEFT JOIN "
4830  "pg_catalog.pg_index i ON (c.reltoastrelid = i.indrelid AND i.indisvalid) "
4831  "LEFT JOIN pg_catalog.pg_class ct ON (c.reltoastrelid = ct.oid) "
4832  "LEFT JOIN pg_catalog.pg_class AS cti ON (i.indexrelid = cti.oid) "
4833  "WHERE c.oid = '%u'::pg_catalog.oid;",
4834  pg_class_oid);
4835 
4836  upgrade_res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4837 
4838  relkind = *PQgetvalue(upgrade_res, 0, PQfnumber(upgrade_res, "relkind"));
4839 
4840  relfilenode = atooid(PQgetvalue(upgrade_res, 0,
4841  PQfnumber(upgrade_res, "relfilenode")));
4842  toast_oid = atooid(PQgetvalue(upgrade_res, 0,
4843  PQfnumber(upgrade_res, "reltoastrelid")));
4844  toast_relfilenode = atooid(PQgetvalue(upgrade_res, 0,
4845  PQfnumber(upgrade_res, "toast_relfilenode")));
4846  toast_index_oid = atooid(PQgetvalue(upgrade_res, 0,
4847  PQfnumber(upgrade_res, "indexrelid")));
4848  toast_index_relfilenode = atooid(PQgetvalue(upgrade_res, 0,
4849  PQfnumber(upgrade_res, "toast_index_relfilenode")));
4850 
4851  appendPQExpBufferStr(upgrade_buffer,
4852  "\n-- For binary upgrade, must preserve pg_class oids and relfilenodes\n");
4853 
4854  if (!is_index)
4855  {
4856  appendPQExpBuffer(upgrade_buffer,
4857  "SELECT pg_catalog.binary_upgrade_set_next_heap_pg_class_oid('%u'::pg_catalog.oid);\n",
4858  pg_class_oid);
4859 
4860  /*
4861  * Not every relation has storage. Also, in a pre-v12 database,
4862  * partitioned tables have a relfilenode, which should not be
4863  * preserved when upgrading.
4864  */
4865  if (OidIsValid(relfilenode) && relkind != RELKIND_PARTITIONED_TABLE)
4866  appendPQExpBuffer(upgrade_buffer,
4867  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
4868  relfilenode);
4869 
4870  /*
4871  * In a pre-v12 database, partitioned tables might be marked as having
4872  * toast tables, but we should ignore them if so.
4873  */
4874  if (OidIsValid(toast_oid) &&
4875  relkind != RELKIND_PARTITIONED_TABLE)
4876  {
4877  appendPQExpBuffer(upgrade_buffer,
4878  "SELECT pg_catalog.binary_upgrade_set_next_toast_pg_class_oid('%u'::pg_catalog.oid);\n",
4879  toast_oid);
4880  appendPQExpBuffer(upgrade_buffer,
4881  "SELECT pg_catalog.binary_upgrade_set_next_toast_relfilenode('%u'::pg_catalog.oid);\n",
4882  toast_relfilenode);
4883 
4884  /* every toast table has an index */
4885  appendPQExpBuffer(upgrade_buffer,
4886  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4887  toast_index_oid);
4888  appendPQExpBuffer(upgrade_buffer,
4889  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
4890  toast_index_relfilenode);
4891  }
4892 
4893  PQclear(upgrade_res);
4894  }
4895  else
4896  {
4897  /* Preserve the OID and relfilenode of the index */
4898  appendPQExpBuffer(upgrade_buffer,
4899  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4900  pg_class_oid);
4901  appendPQExpBuffer(upgrade_buffer,
4902  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
4903  relfilenode);
4904  }
4905 
4906  appendPQExpBufferChar(upgrade_buffer, '\n');
4907 
4908  destroyPQExpBuffer(upgrade_query);
4909 }
4910 
4911 /*
4912  * If the DumpableObject is a member of an extension, add a suitable
4913  * ALTER EXTENSION ADD command to the creation commands in upgrade_buffer.
4914  *
4915  * For somewhat historical reasons, objname should already be quoted,
4916  * but not objnamespace (if any).
4917  */
4918 static void
4920  const DumpableObject *dobj,
4921  const char *objtype,
4922  const char *objname,
4923  const char *objnamespace)
4924 {
4925  DumpableObject *extobj = NULL;
4926  int i;
4927 
4928  if (!dobj->ext_member)
4929  return;
4930 
4931  /*
4932  * Find the parent extension. We could avoid this search if we wanted to
4933  * add a link field to DumpableObject, but the space costs of that would
4934  * be considerable. We assume that member objects could only have a
4935  * direct dependency on their own extension, not any others.
4936  */
4937  for (i = 0; i < dobj->nDeps; i++)
4938  {
4939  extobj = findObjectByDumpId(dobj->dependencies[i]);
4940  if (extobj && extobj->objType == DO_EXTENSION)
4941  break;
4942  extobj = NULL;
4943  }
4944  if (extobj == NULL)
4945  pg_fatal("could not find parent extension for %s %s",
4946  objtype, objname);
4947 
4948  appendPQExpBufferStr(upgrade_buffer,
4949  "\n-- For binary upgrade, handle extension membership the hard way\n");
4950  appendPQExpBuffer(upgrade_buffer, "ALTER EXTENSION %s ADD %s ",
4951  fmtId(extobj->name),
4952  objtype);
4953  if (objnamespace && *objnamespace)
4954  appendPQExpBuffer(upgrade_buffer, "%s.", fmtId(objnamespace));
4955  appendPQExpBuffer(upgrade_buffer, "%s;\n", objname);
4956 }
4957 
4958 /*
4959  * getNamespaces:
4960  * read all namespaces in the system catalogs and return them in the
4961  * NamespaceInfo* structure
4962  *
4963  * numNamespaces is set to the number of namespaces read in
4964  */
4965 NamespaceInfo *
4966 getNamespaces(Archive *fout, int *numNamespaces)
4967 {
4968  PGresult *res;
4969  int ntups;
4970  int i;
4971  PQExpBuffer query;
4972  NamespaceInfo *nsinfo;
4973  int i_tableoid;
4974  int i_oid;
4975  int i_nspname;
4976  int i_nspowner;
4977  int i_nspacl;
4978  int i_acldefault;
4979 
4980  query = createPQExpBuffer();
4981 
4982  /*
4983  * we fetch all namespaces including system ones, so that every object we
4984  * read in can be linked to a containing namespace.
4985  */
4986  appendPQExpBuffer(query, "SELECT n.tableoid, n.oid, n.nspname, "
4987  "n.nspowner, "
4988  "n.nspacl, "
4989  "acldefault('n', n.nspowner) AS acldefault "
4990  "FROM pg_namespace n");
4991 
4992  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4993 
4994  ntups = PQntuples(res);
4995 
4996  nsinfo = (NamespaceInfo *) pg_malloc(ntups * sizeof(NamespaceInfo));
4997 
4998  i_tableoid = PQfnumber(res, "tableoid");
4999  i_oid = PQfnumber(res, "oid");
5000  i_nspname = PQfnumber(res, "nspname");
5001  i_nspowner = PQfnumber(res, "nspowner");
5002  i_nspacl = PQfnumber(res, "nspacl");
5003  i_acldefault = PQfnumber(res, "acldefault");
5004 
5005  for (i = 0; i < ntups; i++)
5006  {
5007  const char *nspowner;
5008 
5009  nsinfo[i].dobj.objType = DO_NAMESPACE;
5010  nsinfo[i].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid));
5011  nsinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));