PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "common/relpath.h"
59 #include "compress_io.h"
60 #include "dumputils.h"
61 #include "fe_utils/option_utils.h"
62 #include "fe_utils/string_utils.h"
63 #include "getopt_long.h"
64 #include "libpq/libpq-fs.h"
65 #include "parallel.h"
66 #include "pg_backup_db.h"
67 #include "pg_backup_utils.h"
68 #include "pg_dump.h"
69 #include "storage/block.h"
70 
71 typedef struct
72 {
73  Oid roleoid; /* role's OID */
74  const char *rolename; /* role's name */
75 } RoleNameItem;
76 
77 typedef struct
78 {
79  const char *descr; /* comment for an object */
80  Oid classoid; /* object class (catalog OID) */
81  Oid objoid; /* object OID */
82  int objsubid; /* subobject (table column #) */
83 } CommentItem;
84 
85 typedef struct
86 {
87  const char *provider; /* label provider of this security label */
88  const char *label; /* security label for an object */
89  Oid classoid; /* object class (catalog OID) */
90  Oid objoid; /* object OID */
91  int objsubid; /* subobject (table column #) */
92 } SecLabelItem;
93 
94 typedef enum OidOptions
95 {
98  zeroAsNone = 4
100 
101 /* global decls */
102 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
103 
104 static Oid g_last_builtin_oid; /* value of the last builtin oid */
105 
106 /* The specified names/patterns should to match at least one entity */
107 static int strict_names = 0;
108 
110 
111 /*
112  * Object inclusion/exclusion lists
113  *
114  * The string lists record the patterns given by command-line switches,
115  * which we then convert to lists of OIDs of matching objects.
116  */
118 static SimpleOidList schema_include_oids = {NULL, NULL};
120 static SimpleOidList schema_exclude_oids = {NULL, NULL};
121 
124 static SimpleOidList table_include_oids = {NULL, NULL};
127 static SimpleOidList table_exclude_oids = {NULL, NULL};
130 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
131 
134 
136 static SimpleOidList extension_include_oids = {NULL, NULL};
137 
138 static const CatalogId nilCatalogId = {0, 0};
139 
140 /* override for standard extra_float_digits setting */
141 static bool have_extra_float_digits = false;
143 
144 /* sorted table of role names */
145 static RoleNameItem *rolenames = NULL;
146 static int nrolenames = 0;
147 
148 /* sorted table of comments */
149 static CommentItem *comments = NULL;
150 static int ncomments = 0;
151 
152 /* sorted table of security labels */
153 static SecLabelItem *seclabels = NULL;
154 static int nseclabels = 0;
155 
156 /*
157  * The default number of rows per INSERT when
158  * --inserts is specified without --rows-per-insert
159  */
160 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
161 
162 /*
163  * Macro for producing quoted, schema-qualified name of a dumpable object.
164  */
165 #define fmtQualifiedDumpable(obj) \
166  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
167  (obj)->dobj.name)
168 
169 static void help(const char *progname);
170 static void setup_connection(Archive *AH,
171  const char *dumpencoding, const char *dumpsnapshot,
172  char *use_role);
174 static void expand_schema_name_patterns(Archive *fout,
175  SimpleStringList *patterns,
176  SimpleOidList *oids,
177  bool strict_names);
178 static void expand_extension_name_patterns(Archive *fout,
179  SimpleStringList *patterns,
180  SimpleOidList *oids,
181  bool strict_names);
183  SimpleStringList *patterns,
184  SimpleOidList *oids);
185 static void expand_table_name_patterns(Archive *fout,
186  SimpleStringList *patterns,
187  SimpleOidList *oids,
188  bool strict_names,
189  bool with_child_tables);
190 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
191  const char *pattern);
192 
193 static NamespaceInfo *findNamespace(Oid nsoid);
194 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
195 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
196 static const char *getRoleName(const char *roleoid_str);
197 static void collectRoleNames(Archive *fout);
198 static void getAdditionalACLs(Archive *fout);
199 static void dumpCommentExtended(Archive *fout, const char *type,
200  const char *name, const char *namespace,
201  const char *owner, CatalogId catalogId,
202  int subid, DumpId dumpId,
203  const char *initdb_comment);
204 static inline void dumpComment(Archive *fout, const char *type,
205  const char *name, const char *namespace,
206  const char *owner, CatalogId catalogId,
207  int subid, DumpId dumpId);
208 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
209 static void collectComments(Archive *fout);
210 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
211  const char *namespace, const char *owner,
212  CatalogId catalogId, int subid, DumpId dumpId);
213 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
214 static void collectSecLabels(Archive *fout);
215 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
216 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
217 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
218 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
219 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
220 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
221 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
222 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
223 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
224 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
225 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
226  PGresult *res);
227 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
228 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
229 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
230 static void dumpCast(Archive *fout, const CastInfo *cast);
231 static void dumpTransform(Archive *fout, const TransformInfo *transform);
232 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
233 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
234 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
235 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
236 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
237 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
238 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
239 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
240 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
241 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
242 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
243 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
244 static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
245 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
246 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
247 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
248 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
249 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
250 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
251 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
252 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
253 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
254 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
255 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
256 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
257 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
258 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
259 static void dumpUserMappings(Archive *fout,
260  const char *servername, const char *namespace,
261  const char *owner, CatalogId catalogId, DumpId dumpId);
262 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
263 
264 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
265  const char *type, const char *name, const char *subname,
266  const char *nspname, const char *owner,
267  const DumpableAcl *dacl);
268 
269 static void getDependencies(Archive *fout);
270 static void BuildArchiveDependencies(Archive *fout);
271 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
272  DumpId **dependencies, int *nDeps, int *allocDeps);
273 
275 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
276  DumpableObject *boundaryObjs);
277 
278 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
279 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
280 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
281 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
282 static void buildMatViewRefreshDependencies(Archive *fout);
283 static void getTableDataFKConstraints(void);
284 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
285  bool is_agg);
286 static char *format_function_signature(Archive *fout,
287  const FuncInfo *finfo, bool honor_quotes);
288 static char *convertRegProcReference(const char *proc);
289 static char *getFormattedOperatorName(const char *oproid);
290 static char *convertTSFunction(Archive *fout, Oid funcOid);
291 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
292 static void getLOs(Archive *fout);
293 static void dumpLO(Archive *fout, const LoInfo *loinfo);
294 static int dumpLOs(Archive *fout, const void *arg);
295 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
296 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
297 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
298 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
299 static void dumpDatabase(Archive *fout);
300 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
301  const char *dbname, Oid dboid);
302 static void dumpEncoding(Archive *AH);
303 static void dumpStdStrings(Archive *AH);
304 static void dumpSearchPath(Archive *AH);
306  PQExpBuffer upgrade_buffer,
307  Oid pg_type_oid,
308  bool force_array_type,
309  bool include_multirange_type);
311  PQExpBuffer upgrade_buffer,
312  const TableInfo *tbinfo);
313 static void binary_upgrade_set_pg_class_oids(Archive *fout,
314  PQExpBuffer upgrade_buffer,
315  Oid pg_class_oid, bool is_index);
316 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
317  const DumpableObject *dobj,
318  const char *objtype,
319  const char *objname,
320  const char *objnamespace);
321 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
322 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
323 static bool nonemptyReloptions(const char *reloptions);
324 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
325  const char *prefix, Archive *fout);
326 static char *get_synchronized_snapshot(Archive *fout);
327 static void setupDumpWorker(Archive *AH);
328 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
329 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
330 
331 
332 int
333 main(int argc, char **argv)
334 {
335  int c;
336  const char *filename = NULL;
337  const char *format = "p";
338  TableInfo *tblinfo;
339  int numTables;
340  DumpableObject **dobjs;
341  int numObjs;
342  DumpableObject *boundaryObjs;
343  int i;
344  int optindex;
345  RestoreOptions *ropt;
346  Archive *fout; /* the script file */
347  bool g_verbose = false;
348  const char *dumpencoding = NULL;
349  const char *dumpsnapshot = NULL;
350  char *use_role = NULL;
351  int numWorkers = 1;
352  int plainText = 0;
353  ArchiveFormat archiveFormat = archUnknown;
354  ArchiveMode archiveMode;
355  pg_compress_specification compression_spec = {0};
356  char *compression_detail = NULL;
357  char *compression_algorithm_str = "none";
358  char *error_detail = NULL;
359  bool user_compression_defined = false;
361 
362  static DumpOptions dopt;
363 
364  static struct option long_options[] = {
365  {"data-only", no_argument, NULL, 'a'},
366  {"blobs", no_argument, NULL, 'b'},
367  {"large-objects", no_argument, NULL, 'b'},
368  {"no-blobs", no_argument, NULL, 'B'},
369  {"no-large-objects", no_argument, NULL, 'B'},
370  {"clean", no_argument, NULL, 'c'},
371  {"create", no_argument, NULL, 'C'},
372  {"dbname", required_argument, NULL, 'd'},
373  {"extension", required_argument, NULL, 'e'},
374  {"file", required_argument, NULL, 'f'},
375  {"format", required_argument, NULL, 'F'},
376  {"host", required_argument, NULL, 'h'},
377  {"jobs", 1, NULL, 'j'},
378  {"no-reconnect", no_argument, NULL, 'R'},
379  {"no-owner", no_argument, NULL, 'O'},
380  {"port", required_argument, NULL, 'p'},
381  {"schema", required_argument, NULL, 'n'},
382  {"exclude-schema", required_argument, NULL, 'N'},
383  {"schema-only", no_argument, NULL, 's'},
384  {"superuser", required_argument, NULL, 'S'},
385  {"table", required_argument, NULL, 't'},
386  {"exclude-table", required_argument, NULL, 'T'},
387  {"no-password", no_argument, NULL, 'w'},
388  {"password", no_argument, NULL, 'W'},
389  {"username", required_argument, NULL, 'U'},
390  {"verbose", no_argument, NULL, 'v'},
391  {"no-privileges", no_argument, NULL, 'x'},
392  {"no-acl", no_argument, NULL, 'x'},
393  {"compress", required_argument, NULL, 'Z'},
394  {"encoding", required_argument, NULL, 'E'},
395  {"help", no_argument, NULL, '?'},
396  {"version", no_argument, NULL, 'V'},
397 
398  /*
399  * the following options don't have an equivalent short option letter
400  */
401  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
402  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
403  {"column-inserts", no_argument, &dopt.column_inserts, 1},
404  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
405  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
406  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
407  {"exclude-table-data", required_argument, NULL, 4},
408  {"extra-float-digits", required_argument, NULL, 8},
409  {"if-exists", no_argument, &dopt.if_exists, 1},
410  {"inserts", no_argument, NULL, 9},
411  {"lock-wait-timeout", required_argument, NULL, 2},
412  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
413  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
414  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
415  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
416  {"role", required_argument, NULL, 3},
417  {"section", required_argument, NULL, 5},
418  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
419  {"snapshot", required_argument, NULL, 6},
420  {"strict-names", no_argument, &strict_names, 1},
421  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
422  {"no-comments", no_argument, &dopt.no_comments, 1},
423  {"no-publications", no_argument, &dopt.no_publications, 1},
424  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
425  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
426  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
427  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
428  {"no-sync", no_argument, NULL, 7},
429  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
430  {"rows-per-insert", required_argument, NULL, 10},
431  {"include-foreign-data", required_argument, NULL, 11},
432  {"table-and-children", required_argument, NULL, 12},
433  {"exclude-table-and-children", required_argument, NULL, 13},
434  {"exclude-table-data-and-children", required_argument, NULL, 14},
435  {"sync-method", required_argument, NULL, 15},
436 
437  {NULL, 0, NULL, 0}
438  };
439 
440  pg_logging_init(argv[0]);
442  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
443 
444  /*
445  * Initialize what we need for parallel execution, especially for thread
446  * support on Windows.
447  */
449 
450  progname = get_progname(argv[0]);
451 
452  if (argc > 1)
453  {
454  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
455  {
456  help(progname);
457  exit_nicely(0);
458  }
459  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
460  {
461  puts("pg_dump (PostgreSQL) " PG_VERSION);
462  exit_nicely(0);
463  }
464  }
465 
466  InitDumpOptions(&dopt);
467 
468  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
469  long_options, &optindex)) != -1)
470  {
471  switch (c)
472  {
473  case 'a': /* Dump data only */
474  dopt.dataOnly = true;
475  break;
476 
477  case 'b': /* Dump LOs */
478  dopt.outputLOs = true;
479  break;
480 
481  case 'B': /* Don't dump LOs */
482  dopt.dontOutputLOs = true;
483  break;
484 
485  case 'c': /* clean (i.e., drop) schema prior to create */
486  dopt.outputClean = 1;
487  break;
488 
489  case 'C': /* Create DB */
490  dopt.outputCreateDB = 1;
491  break;
492 
493  case 'd': /* database name */
494  dopt.cparams.dbname = pg_strdup(optarg);
495  break;
496 
497  case 'e': /* include extension(s) */
499  dopt.include_everything = false;
500  break;
501 
502  case 'E': /* Dump encoding */
503  dumpencoding = pg_strdup(optarg);
504  break;
505 
506  case 'f':
508  break;
509 
510  case 'F':
512  break;
513 
514  case 'h': /* server host */
515  dopt.cparams.pghost = pg_strdup(optarg);
516  break;
517 
518  case 'j': /* number of dump jobs */
519  if (!option_parse_int(optarg, "-j/--jobs", 1,
520  PG_MAX_JOBS,
521  &numWorkers))
522  exit_nicely(1);
523  break;
524 
525  case 'n': /* include schema(s) */
527  dopt.include_everything = false;
528  break;
529 
530  case 'N': /* exclude schema(s) */
532  break;
533 
534  case 'O': /* Don't reconnect to match owner */
535  dopt.outputNoOwner = 1;
536  break;
537 
538  case 'p': /* server port */
539  dopt.cparams.pgport = pg_strdup(optarg);
540  break;
541 
542  case 'R':
543  /* no-op, still accepted for backwards compatibility */
544  break;
545 
546  case 's': /* dump schema only */
547  dopt.schemaOnly = true;
548  break;
549 
550  case 'S': /* Username for superuser in plain text output */
552  break;
553 
554  case 't': /* include table(s) */
556  dopt.include_everything = false;
557  break;
558 
559  case 'T': /* exclude table(s) */
561  break;
562 
563  case 'U':
565  break;
566 
567  case 'v': /* verbose */
568  g_verbose = true;
570  break;
571 
572  case 'w':
574  break;
575 
576  case 'W':
578  break;
579 
580  case 'x': /* skip ACL dump */
581  dopt.aclsSkip = true;
582  break;
583 
584  case 'Z': /* Compression */
585  parse_compress_options(optarg, &compression_algorithm_str,
586  &compression_detail);
587  user_compression_defined = true;
588  break;
589 
590  case 0:
591  /* This covers the long options. */
592  break;
593 
594  case 2: /* lock-wait-timeout */
596  break;
597 
598  case 3: /* SET ROLE */
599  use_role = pg_strdup(optarg);
600  break;
601 
602  case 4: /* exclude table(s) data */
604  break;
605 
606  case 5: /* section */
608  break;
609 
610  case 6: /* snapshot */
611  dumpsnapshot = pg_strdup(optarg);
612  break;
613 
614  case 7: /* no-sync */
615  dosync = false;
616  break;
617 
618  case 8:
620  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
622  exit_nicely(1);
623  break;
624 
625  case 9: /* inserts */
626 
627  /*
628  * dump_inserts also stores --rows-per-insert, careful not to
629  * overwrite that.
630  */
631  if (dopt.dump_inserts == 0)
633  break;
634 
635  case 10: /* rows per insert */
636  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
637  &dopt.dump_inserts))
638  exit_nicely(1);
639  break;
640 
641  case 11: /* include foreign data */
643  optarg);
644  break;
645 
646  case 12: /* include table(s) and their children */
648  optarg);
649  dopt.include_everything = false;
650  break;
651 
652  case 13: /* exclude table(s) and their children */
654  optarg);
655  break;
656 
657  case 14: /* exclude data of table(s) and children */
659  optarg);
660  break;
661 
662  case 15:
664  exit_nicely(1);
665  break;
666 
667  default:
668  /* getopt_long already emitted a complaint */
669  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
670  exit_nicely(1);
671  }
672  }
673 
674  /*
675  * Non-option argument specifies database name as long as it wasn't
676  * already specified with -d / --dbname
677  */
678  if (optind < argc && dopt.cparams.dbname == NULL)
679  dopt.cparams.dbname = argv[optind++];
680 
681  /* Complain if any arguments remain */
682  if (optind < argc)
683  {
684  pg_log_error("too many command-line arguments (first is \"%s\")",
685  argv[optind]);
686  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
687  exit_nicely(1);
688  }
689 
690  /* --column-inserts implies --inserts */
691  if (dopt.column_inserts && dopt.dump_inserts == 0)
693 
694  /*
695  * Binary upgrade mode implies dumping sequence data even in schema-only
696  * mode. This is not exposed as a separate option, but kept separate
697  * internally for clarity.
698  */
699  if (dopt.binary_upgrade)
700  dopt.sequence_data = 1;
701 
702  if (dopt.dataOnly && dopt.schemaOnly)
703  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
704 
706  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
707 
708  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
709  pg_fatal("option --include-foreign-data is not supported with parallel backup");
710 
711  if (dopt.dataOnly && dopt.outputClean)
712  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
713 
714  if (dopt.if_exists && !dopt.outputClean)
715  pg_fatal("option --if-exists requires option -c/--clean");
716 
717  /*
718  * --inserts are already implied above if --column-inserts or
719  * --rows-per-insert were specified.
720  */
721  if (dopt.do_nothing && dopt.dump_inserts == 0)
722  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
723 
724  /* Identify archive format to emit */
725  archiveFormat = parseArchiveFormat(format, &archiveMode);
726 
727  /* archiveFormat specific setup */
728  if (archiveFormat == archNull)
729  plainText = 1;
730 
731  /*
732  * Custom and directory formats are compressed by default with gzip when
733  * available, not the others. If gzip is not available, no compression is
734  * done by default.
735  */
736  if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
737  !user_compression_defined)
738  {
739 #ifdef HAVE_LIBZ
740  compression_algorithm_str = "gzip";
741 #else
742  compression_algorithm_str = "none";
743 #endif
744  }
745 
746  /*
747  * Compression options
748  */
749  if (!parse_compress_algorithm(compression_algorithm_str,
751  pg_fatal("unrecognized compression algorithm: \"%s\"",
752  compression_algorithm_str);
753 
755  &compression_spec);
756  error_detail = validate_compress_specification(&compression_spec);
757  if (error_detail != NULL)
758  pg_fatal("invalid compression specification: %s",
759  error_detail);
760 
761  error_detail = supports_compression(compression_spec);
762  if (error_detail != NULL)
763  pg_fatal("%s", error_detail);
764 
765  /*
766  * Disable support for zstd workers for now - these are based on
767  * threading, and it's unclear how it interacts with parallel dumps on
768  * platforms where that relies on threads too (e.g. Windows).
769  */
770  if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
771  pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
772  "workers");
773 
774  /*
775  * If emitting an archive format, we always want to emit a DATABASE item,
776  * in case --create is specified at pg_restore time.
777  */
778  if (!plainText)
779  dopt.outputCreateDB = 1;
780 
781  /* Parallel backup only in the directory archive format so far */
782  if (archiveFormat != archDirectory && numWorkers > 1)
783  pg_fatal("parallel backup only supported by the directory format");
784 
785  /* Open the output file */
786  fout = CreateArchive(filename, archiveFormat, compression_spec,
787  dosync, archiveMode, setupDumpWorker, sync_method);
788 
789  /* Make dump options accessible right away */
790  SetArchiveOptions(fout, &dopt, NULL);
791 
792  /* Register the cleanup hook */
793  on_exit_close_archive(fout);
794 
795  /* Let the archiver know how noisy to be */
796  fout->verbose = g_verbose;
797 
798 
799  /*
800  * We allow the server to be back to 9.2, and up to any minor release of
801  * our own major version. (See also version check in pg_dumpall.c.)
802  */
803  fout->minRemoteVersion = 90200;
804  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
805 
806  fout->numWorkers = numWorkers;
807 
808  /*
809  * Open the database using the Archiver, so it knows about it. Errors mean
810  * death.
811  */
812  ConnectDatabase(fout, &dopt.cparams, false);
813  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
814 
815  /*
816  * On hot standbys, never try to dump unlogged table data, since it will
817  * just throw an error.
818  */
819  if (fout->isStandby)
820  dopt.no_unlogged_table_data = true;
821 
822  /*
823  * Find the last built-in OID, if needed (prior to 8.1)
824  *
825  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
826  */
828 
829  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
830 
831  /* Expand schema selection patterns into OID lists */
832  if (schema_include_patterns.head != NULL)
833  {
836  strict_names);
837  if (schema_include_oids.head == NULL)
838  pg_fatal("no matching schemas were found");
839  }
842  false);
843  /* non-matching exclusion patterns aren't an error */
844 
845  /* Expand table selection patterns into OID lists */
848  strict_names, false);
851  strict_names, true);
852  if ((table_include_patterns.head != NULL ||
854  table_include_oids.head == NULL)
855  pg_fatal("no matching tables were found");
856 
859  false, false);
862  false, true);
863 
866  false, false);
869  false, true);
870 
873 
874  /* non-matching exclusion patterns aren't an error */
875 
876  /* Expand extension selection patterns into OID lists */
877  if (extension_include_patterns.head != NULL)
878  {
881  strict_names);
882  if (extension_include_oids.head == NULL)
883  pg_fatal("no matching extensions were found");
884  }
885 
886  /*
887  * Dumping LOs is the default for dumps where an inclusion switch is not
888  * used (an "include everything" dump). -B can be used to exclude LOs
889  * from those dumps. -b can be used to include LOs even when an inclusion
890  * switch is used.
891  *
892  * -s means "schema only" and LOs are data, not schema, so we never
893  * include LOs when -s is used.
894  */
895  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputLOs)
896  dopt.outputLOs = true;
897 
898  /*
899  * Collect role names so we can map object owner OIDs to names.
900  */
901  collectRoleNames(fout);
902 
903  /*
904  * Now scan the database and create DumpableObject structs for all the
905  * objects we intend to dump.
906  */
907  tblinfo = getSchemaData(fout, &numTables);
908 
909  if (!dopt.schemaOnly)
910  {
911  getTableData(&dopt, tblinfo, numTables, 0);
913  if (dopt.dataOnly)
915  }
916 
917  if (dopt.schemaOnly && dopt.sequence_data)
918  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
919 
920  /*
921  * In binary-upgrade mode, we do not have to worry about the actual LO
922  * data or the associated metadata that resides in the pg_largeobject and
923  * pg_largeobject_metadata tables, respectively.
924  *
925  * However, we do need to collect LO information as there may be comments
926  * or other information on LOs that we do need to dump out.
927  */
928  if (dopt.outputLOs || dopt.binary_upgrade)
929  getLOs(fout);
930 
931  /*
932  * Collect dependency data to assist in ordering the objects.
933  */
934  getDependencies(fout);
935 
936  /*
937  * Collect ACLs, comments, and security labels, if wanted.
938  */
939  if (!dopt.aclsSkip)
940  getAdditionalACLs(fout);
941  if (!dopt.no_comments)
942  collectComments(fout);
943  if (!dopt.no_security_labels)
944  collectSecLabels(fout);
945 
946  /* Lastly, create dummy objects to represent the section boundaries */
947  boundaryObjs = createBoundaryObjects();
948 
949  /* Get pointers to all the known DumpableObjects */
950  getDumpableObjects(&dobjs, &numObjs);
951 
952  /*
953  * Add dummy dependencies to enforce the dump section ordering.
954  */
955  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
956 
957  /*
958  * Sort the objects into a safe dump order (no forward references).
959  *
960  * We rely on dependency information to help us determine a safe order, so
961  * the initial sort is mostly for cosmetic purposes: we sort by name to
962  * ensure that logically identical schemas will dump identically.
963  */
964  sortDumpableObjectsByTypeName(dobjs, numObjs);
965 
966  sortDumpableObjects(dobjs, numObjs,
967  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
968 
969  /*
970  * Create archive TOC entries for all the objects to be dumped, in a safe
971  * order.
972  */
973 
974  /*
975  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
976  */
977  dumpEncoding(fout);
978  dumpStdStrings(fout);
979  dumpSearchPath(fout);
980 
981  /* The database items are always next, unless we don't want them at all */
982  if (dopt.outputCreateDB)
983  dumpDatabase(fout);
984 
985  /* Now the rearrangeable objects. */
986  for (i = 0; i < numObjs; i++)
987  dumpDumpableObject(fout, dobjs[i]);
988 
989  /*
990  * Set up options info to ensure we dump what we want.
991  */
992  ropt = NewRestoreOptions();
993  ropt->filename = filename;
994 
995  /* if you change this list, see dumpOptionsFromRestoreOptions */
996  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
997  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
998  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
999  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
1001  ropt->dropSchema = dopt.outputClean;
1002  ropt->dataOnly = dopt.dataOnly;
1003  ropt->schemaOnly = dopt.schemaOnly;
1004  ropt->if_exists = dopt.if_exists;
1005  ropt->column_inserts = dopt.column_inserts;
1006  ropt->dumpSections = dopt.dumpSections;
1007  ropt->aclsSkip = dopt.aclsSkip;
1008  ropt->superuser = dopt.outputSuperuser;
1009  ropt->createDB = dopt.outputCreateDB;
1010  ropt->noOwner = dopt.outputNoOwner;
1011  ropt->noTableAm = dopt.outputNoTableAm;
1012  ropt->noTablespace = dopt.outputNoTablespaces;
1013  ropt->disable_triggers = dopt.disable_triggers;
1014  ropt->use_setsessauth = dopt.use_setsessauth;
1016  ropt->dump_inserts = dopt.dump_inserts;
1017  ropt->no_comments = dopt.no_comments;
1018  ropt->no_publications = dopt.no_publications;
1020  ropt->no_subscriptions = dopt.no_subscriptions;
1021  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
1024  ropt->sequence_data = dopt.sequence_data;
1025  ropt->binary_upgrade = dopt.binary_upgrade;
1026 
1027  ropt->compression_spec = compression_spec;
1028 
1029  ropt->suppressDumpWarnings = true; /* We've already shown them */
1030 
1031  SetArchiveOptions(fout, &dopt, ropt);
1032 
1033  /* Mark which entries should be output */
1035 
1036  /*
1037  * The archive's TOC entries are now marked as to which ones will actually
1038  * be output, so we can set up their dependency lists properly. This isn't
1039  * necessary for plain-text output, though.
1040  */
1041  if (!plainText)
1043 
1044  /*
1045  * And finally we can do the actual output.
1046  *
1047  * Note: for non-plain-text output formats, the output file is written
1048  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1049  * right now.
1050  */
1051  if (plainText)
1052  RestoreArchive(fout);
1053 
1054  CloseArchive(fout);
1055 
1056  exit_nicely(0);
1057 }
1058 
1059 
1060 static void
1061 help(const char *progname)
1062 {
1063  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1064  printf(_("Usage:\n"));
1065  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1066 
1067  printf(_("\nGeneral options:\n"));
1068  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1069  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1070  " plain text (default))\n"));
1071  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1072  printf(_(" -v, --verbose verbose mode\n"));
1073  printf(_(" -V, --version output version information, then exit\n"));
1074  printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
1075  " compress as specified\n"));
1076  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1077  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1078  printf(_(" --sync-method=METHOD set method for syncing files to disk\n"));
1079  printf(_(" -?, --help show this help, then exit\n"));
1080 
1081  printf(_("\nOptions controlling the output content:\n"));
1082  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1083  printf(_(" -b, --large-objects include large objects in dump\n"));
1084  printf(_(" --blobs (same as --large-objects, deprecated)\n"));
1085  printf(_(" -B, --no-large-objects exclude large objects in dump\n"));
1086  printf(_(" --no-blobs (same as --no-large-objects, deprecated)\n"));
1087  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1088  printf(_(" -C, --create include commands to create database in dump\n"));
1089  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1090  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1091  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1092  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1093  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1094  " plain-text format\n"));
1095  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1096  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1097  printf(_(" -t, --table=PATTERN dump only the specified table(s)\n"));
1098  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1099  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1100  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1101  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1102  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1103  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1104  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1105  " access to)\n"));
1106  printf(_(" --exclude-table-and-children=PATTERN\n"
1107  " do NOT dump the specified table(s), including\n"
1108  " child and partition tables\n"));
1109  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1110  printf(_(" --exclude-table-data-and-children=PATTERN\n"
1111  " do NOT dump data for the specified table(s),\n"
1112  " including child and partition tables\n"));
1113  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1114  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1115  printf(_(" --include-foreign-data=PATTERN\n"
1116  " include data of foreign tables on foreign\n"
1117  " servers matching PATTERN\n"));
1118  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1119  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1120  printf(_(" --no-comments do not dump comments\n"));
1121  printf(_(" --no-publications do not dump publications\n"));
1122  printf(_(" --no-security-labels do not dump security label assignments\n"));
1123  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1124  printf(_(" --no-table-access-method do not dump table access methods\n"));
1125  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1126  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1127  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1128  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1129  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1130  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1131  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1132  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1133  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1134  printf(_(" --strict-names require table and/or schema include patterns to\n"
1135  " match at least one entity each\n"));
1136  printf(_(" --table-and-children=PATTERN dump only the specified table(s), including\n"
1137  " child and partition tables\n"));
1138  printf(_(" --use-set-session-authorization\n"
1139  " use SET SESSION AUTHORIZATION commands instead of\n"
1140  " ALTER OWNER commands to set ownership\n"));
1141 
1142  printf(_("\nConnection options:\n"));
1143  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1144  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1145  printf(_(" -p, --port=PORT database server port number\n"));
1146  printf(_(" -U, --username=NAME connect as specified database user\n"));
1147  printf(_(" -w, --no-password never prompt for password\n"));
1148  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1149  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1150 
1151  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1152  "variable value is used.\n\n"));
1153  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1154  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1155 }
1156 
1157 static void
1158 setup_connection(Archive *AH, const char *dumpencoding,
1159  const char *dumpsnapshot, char *use_role)
1160 {
1161  DumpOptions *dopt = AH->dopt;
1162  PGconn *conn = GetConnection(AH);
1163  const char *std_strings;
1164 
1166 
1167  /*
1168  * Set the client encoding if requested.
1169  */
1170  if (dumpencoding)
1171  {
1172  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1173  pg_fatal("invalid client encoding \"%s\" specified",
1174  dumpencoding);
1175  }
1176 
1177  /*
1178  * Get the active encoding and the standard_conforming_strings setting, so
1179  * we know how to escape strings.
1180  */
1182 
1183  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1184  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1185 
1186  /*
1187  * Set the role if requested. In a parallel dump worker, we'll be passed
1188  * use_role == NULL, but AH->use_role is already set (if user specified it
1189  * originally) and we should use that.
1190  */
1191  if (!use_role && AH->use_role)
1192  use_role = AH->use_role;
1193 
1194  /* Set the role if requested */
1195  if (use_role)
1196  {
1197  PQExpBuffer query = createPQExpBuffer();
1198 
1199  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1200  ExecuteSqlStatement(AH, query->data);
1201  destroyPQExpBuffer(query);
1202 
1203  /* save it for possible later use by parallel workers */
1204  if (!AH->use_role)
1205  AH->use_role = pg_strdup(use_role);
1206  }
1207 
1208  /* Set the datestyle to ISO to ensure the dump's portability */
1209  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1210 
1211  /* Likewise, avoid using sql_standard intervalstyle */
1212  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1213 
1214  /*
1215  * Use an explicitly specified extra_float_digits if it has been provided.
1216  * Otherwise, set extra_float_digits so that we can dump float data
1217  * exactly (given correctly implemented float I/O code, anyway).
1218  */
1220  {
1222 
1223  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1225  ExecuteSqlStatement(AH, q->data);
1226  destroyPQExpBuffer(q);
1227  }
1228  else
1229  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1230 
1231  /*
1232  * Disable synchronized scanning, to prevent unpredictable changes in row
1233  * ordering across a dump and reload.
1234  */
1235  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1236 
1237  /*
1238  * Disable timeouts if supported.
1239  */
1240  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1241  if (AH->remoteVersion >= 90300)
1242  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1243  if (AH->remoteVersion >= 90600)
1244  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1245 
1246  /*
1247  * Quote all identifiers, if requested.
1248  */
1250  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1251 
1252  /*
1253  * Adjust row-security mode, if supported.
1254  */
1255  if (AH->remoteVersion >= 90500)
1256  {
1257  if (dopt->enable_row_security)
1258  ExecuteSqlStatement(AH, "SET row_security = on");
1259  else
1260  ExecuteSqlStatement(AH, "SET row_security = off");
1261  }
1262 
1263  /*
1264  * Initialize prepared-query state to "nothing prepared". We do this here
1265  * so that a parallel dump worker will have its own state.
1266  */
1267  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1268 
1269  /*
1270  * Start transaction-snapshot mode transaction to dump consistent data.
1271  */
1272  ExecuteSqlStatement(AH, "BEGIN");
1273 
1274  /*
1275  * To support the combination of serializable_deferrable with the jobs
1276  * option we use REPEATABLE READ for the worker connections that are
1277  * passed a snapshot. As long as the snapshot is acquired in a
1278  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1279  * REPEATABLE READ transaction provides the appropriate integrity
1280  * guarantees. This is a kluge, but safe for back-patching.
1281  */
1282  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1284  "SET TRANSACTION ISOLATION LEVEL "
1285  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1286  else
1288  "SET TRANSACTION ISOLATION LEVEL "
1289  "REPEATABLE READ, READ ONLY");
1290 
1291  /*
1292  * If user specified a snapshot to use, select that. In a parallel dump
1293  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1294  * is already set (if the server can handle it) and we should use that.
1295  */
1296  if (dumpsnapshot)
1297  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1298 
1299  if (AH->sync_snapshot_id)
1300  {
1301  PQExpBuffer query = createPQExpBuffer();
1302 
1303  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1305  ExecuteSqlStatement(AH, query->data);
1306  destroyPQExpBuffer(query);
1307  }
1308  else if (AH->numWorkers > 1)
1309  {
1310  if (AH->isStandby && AH->remoteVersion < 100000)
1311  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1313  }
1314 }
1315 
1316 /* Set up connection for a parallel worker process */
1317 static void
1319 {
1320  /*
1321  * We want to re-select all the same values the leader connection is
1322  * using. We'll have inherited directly-usable values in
1323  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1324  * inherited encoding value back to a string to pass to setup_connection.
1325  */
1326  setup_connection(AH,
1328  NULL,
1329  NULL);
1330 }
1331 
1332 static char *
1334 {
1335  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1336  char *result;
1337  PGresult *res;
1338 
1339  res = ExecuteSqlQueryForSingleRow(fout, query);
1340  result = pg_strdup(PQgetvalue(res, 0, 0));
1341  PQclear(res);
1342 
1343  return result;
1344 }
1345 
1346 static ArchiveFormat
1348 {
1349  ArchiveFormat archiveFormat;
1350 
1351  *mode = archModeWrite;
1352 
1353  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1354  {
1355  /* This is used by pg_dumpall, and is not documented */
1356  archiveFormat = archNull;
1357  *mode = archModeAppend;
1358  }
1359  else if (pg_strcasecmp(format, "c") == 0)
1360  archiveFormat = archCustom;
1361  else if (pg_strcasecmp(format, "custom") == 0)
1362  archiveFormat = archCustom;
1363  else if (pg_strcasecmp(format, "d") == 0)
1364  archiveFormat = archDirectory;
1365  else if (pg_strcasecmp(format, "directory") == 0)
1366  archiveFormat = archDirectory;
1367  else if (pg_strcasecmp(format, "p") == 0)
1368  archiveFormat = archNull;
1369  else if (pg_strcasecmp(format, "plain") == 0)
1370  archiveFormat = archNull;
1371  else if (pg_strcasecmp(format, "t") == 0)
1372  archiveFormat = archTar;
1373  else if (pg_strcasecmp(format, "tar") == 0)
1374  archiveFormat = archTar;
1375  else
1376  pg_fatal("invalid output format \"%s\" specified", format);
1377  return archiveFormat;
1378 }
1379 
1380 /*
1381  * Find the OIDs of all schemas matching the given list of patterns,
1382  * and append them to the given OID list.
1383  */
1384 static void
1386  SimpleStringList *patterns,
1387  SimpleOidList *oids,
1388  bool strict_names)
1389 {
1390  PQExpBuffer query;
1391  PGresult *res;
1392  SimpleStringListCell *cell;
1393  int i;
1394 
1395  if (patterns->head == NULL)
1396  return; /* nothing to do */
1397 
1398  query = createPQExpBuffer();
1399 
1400  /*
1401  * The loop below runs multiple SELECTs might sometimes result in
1402  * duplicate entries in the OID list, but we don't care.
1403  */
1404 
1405  for (cell = patterns->head; cell; cell = cell->next)
1406  {
1407  PQExpBufferData dbbuf;
1408  int dotcnt;
1409 
1410  appendPQExpBufferStr(query,
1411  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1412  initPQExpBuffer(&dbbuf);
1413  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1414  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1415  &dotcnt);
1416  if (dotcnt > 1)
1417  pg_fatal("improper qualified name (too many dotted names): %s",
1418  cell->val);
1419  else if (dotcnt == 1)
1420  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1421  termPQExpBuffer(&dbbuf);
1422 
1423  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1424  if (strict_names && PQntuples(res) == 0)
1425  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1426 
1427  for (i = 0; i < PQntuples(res); i++)
1428  {
1430  }
1431 
1432  PQclear(res);
1433  resetPQExpBuffer(query);
1434  }
1435 
1436  destroyPQExpBuffer(query);
1437 }
1438 
1439 /*
1440  * Find the OIDs of all extensions matching the given list of patterns,
1441  * and append them to the given OID list.
1442  */
1443 static void
1445  SimpleStringList *patterns,
1446  SimpleOidList *oids,
1447  bool strict_names)
1448 {
1449  PQExpBuffer query;
1450  PGresult *res;
1451  SimpleStringListCell *cell;
1452  int i;
1453 
1454  if (patterns->head == NULL)
1455  return; /* nothing to do */
1456 
1457  query = createPQExpBuffer();
1458 
1459  /*
1460  * The loop below runs multiple SELECTs might sometimes result in
1461  * duplicate entries in the OID list, but we don't care.
1462  */
1463  for (cell = patterns->head; cell; cell = cell->next)
1464  {
1465  int dotcnt;
1466 
1467  appendPQExpBufferStr(query,
1468  "SELECT oid FROM pg_catalog.pg_extension e\n");
1469  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1470  false, NULL, "e.extname", NULL, NULL, NULL,
1471  &dotcnt);
1472  if (dotcnt > 0)
1473  pg_fatal("improper qualified name (too many dotted names): %s",
1474  cell->val);
1475 
1476  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1477  if (strict_names && PQntuples(res) == 0)
1478  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1479 
1480  for (i = 0; i < PQntuples(res); i++)
1481  {
1483  }
1484 
1485  PQclear(res);
1486  resetPQExpBuffer(query);
1487  }
1488 
1489  destroyPQExpBuffer(query);
1490 }
1491 
1492 /*
1493  * Find the OIDs of all foreign servers matching the given list of patterns,
1494  * and append them to the given OID list.
1495  */
1496 static void
1498  SimpleStringList *patterns,
1499  SimpleOidList *oids)
1500 {
1501  PQExpBuffer query;
1502  PGresult *res;
1503  SimpleStringListCell *cell;
1504  int i;
1505 
1506  if (patterns->head == NULL)
1507  return; /* nothing to do */
1508 
1509  query = createPQExpBuffer();
1510 
1511  /*
1512  * The loop below runs multiple SELECTs might sometimes result in
1513  * duplicate entries in the OID list, but we don't care.
1514  */
1515 
1516  for (cell = patterns->head; cell; cell = cell->next)
1517  {
1518  int dotcnt;
1519 
1520  appendPQExpBufferStr(query,
1521  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1522  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1523  false, NULL, "s.srvname", NULL, NULL, NULL,
1524  &dotcnt);
1525  if (dotcnt > 0)
1526  pg_fatal("improper qualified name (too many dotted names): %s",
1527  cell->val);
1528 
1529  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1530  if (PQntuples(res) == 0)
1531  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1532 
1533  for (i = 0; i < PQntuples(res); i++)
1535 
1536  PQclear(res);
1537  resetPQExpBuffer(query);
1538  }
1539 
1540  destroyPQExpBuffer(query);
1541 }
1542 
1543 /*
1544  * Find the OIDs of all tables matching the given list of patterns,
1545  * and append them to the given OID list. See also expand_dbname_patterns()
1546  * in pg_dumpall.c
1547  */
1548 static void
1550  SimpleStringList *patterns, SimpleOidList *oids,
1551  bool strict_names, bool with_child_tables)
1552 {
1553  PQExpBuffer query;
1554  PGresult *res;
1555  SimpleStringListCell *cell;
1556  int i;
1557 
1558  if (patterns->head == NULL)
1559  return; /* nothing to do */
1560 
1561  query = createPQExpBuffer();
1562 
1563  /*
1564  * this might sometimes result in duplicate entries in the OID list, but
1565  * we don't care.
1566  */
1567 
1568  for (cell = patterns->head; cell; cell = cell->next)
1569  {
1570  PQExpBufferData dbbuf;
1571  int dotcnt;
1572 
1573  /*
1574  * Query must remain ABSOLUTELY devoid of unqualified names. This
1575  * would be unnecessary given a pg_table_is_visible() variant taking a
1576  * search_path argument.
1577  *
1578  * For with_child_tables, we start with the basic query's results and
1579  * recursively search the inheritance tree to add child tables.
1580  */
1581  if (with_child_tables)
1582  {
1583  appendPQExpBuffer(query, "WITH RECURSIVE partition_tree (relid) AS (\n");
1584  }
1585 
1586  appendPQExpBuffer(query,
1587  "SELECT c.oid"
1588  "\nFROM pg_catalog.pg_class c"
1589  "\n LEFT JOIN pg_catalog.pg_namespace n"
1590  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1591  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1592  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1593  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1594  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1595  RELKIND_PARTITIONED_TABLE);
1596  initPQExpBuffer(&dbbuf);
1597  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1598  false, "n.nspname", "c.relname", NULL,
1599  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1600  &dotcnt);
1601  if (dotcnt > 2)
1602  pg_fatal("improper relation name (too many dotted names): %s",
1603  cell->val);
1604  else if (dotcnt == 2)
1605  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1606  termPQExpBuffer(&dbbuf);
1607 
1608  if (with_child_tables)
1609  {
1610  appendPQExpBuffer(query, "UNION"
1611  "\nSELECT i.inhrelid"
1612  "\nFROM partition_tree p"
1613  "\n JOIN pg_catalog.pg_inherits i"
1614  "\n ON p.relid OPERATOR(pg_catalog.=) i.inhparent"
1615  "\n)"
1616  "\nSELECT relid FROM partition_tree");
1617  }
1618 
1619  ExecuteSqlStatement(fout, "RESET search_path");
1620  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1623  if (strict_names && PQntuples(res) == 0)
1624  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1625 
1626  for (i = 0; i < PQntuples(res); i++)
1627  {
1629  }
1630 
1631  PQclear(res);
1632  resetPQExpBuffer(query);
1633  }
1634 
1635  destroyPQExpBuffer(query);
1636 }
1637 
1638 /*
1639  * Verifies that the connected database name matches the given database name,
1640  * and if not, dies with an error about the given pattern.
1641  *
1642  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1643  */
1644 static void
1645 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1646 {
1647  const char *db;
1648 
1649  db = PQdb(conn);
1650  if (db == NULL)
1651  pg_fatal("You are currently not connected to a database.");
1652 
1653  if (strcmp(db, dbname) != 0)
1654  pg_fatal("cross-database references are not implemented: %s",
1655  pattern);
1656 }
1657 
1658 /*
1659  * checkExtensionMembership
1660  * Determine whether object is an extension member, and if so,
1661  * record an appropriate dependency and set the object's dump flag.
1662  *
1663  * It's important to call this for each object that could be an extension
1664  * member. Generally, we integrate this with determining the object's
1665  * to-be-dumped-ness, since extension membership overrides other rules for that.
1666  *
1667  * Returns true if object is an extension member, else false.
1668  */
1669 static bool
1671 {
1672  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1673 
1674  if (ext == NULL)
1675  return false;
1676 
1677  dobj->ext_member = true;
1678 
1679  /* Record dependency so that getDependencies needn't deal with that */
1680  addObjectDependency(dobj, ext->dobj.dumpId);
1681 
1682  /*
1683  * In 9.6 and above, mark the member object to have any non-initial ACL,
1684  * policies, and security labels dumped.
1685  *
1686  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1687  * extract the information about the object. We don't provide support for
1688  * initial policies and security labels and it seems unlikely for those to
1689  * ever exist, but we may have to revisit this later.
1690  *
1691  * Prior to 9.6, we do not include any extension member components.
1692  *
1693  * In binary upgrades, we still dump all components of the members
1694  * individually, since the idea is to exactly reproduce the database
1695  * contents rather than replace the extension contents with something
1696  * different.
1697  */
1698  if (fout->dopt->binary_upgrade)
1699  dobj->dump = ext->dobj.dump;
1700  else
1701  {
1702  if (fout->remoteVersion < 90600)
1703  dobj->dump = DUMP_COMPONENT_NONE;
1704  else
1705  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1708  }
1709 
1710  return true;
1711 }
1712 
1713 /*
1714  * selectDumpableNamespace: policy-setting subroutine
1715  * Mark a namespace as to be dumped or not
1716  */
1717 static void
1719 {
1720  /*
1721  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1722  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1723  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1724  */
1725  nsinfo->create = true;
1726 
1727  /*
1728  * If specific tables are being dumped, do not dump any complete
1729  * namespaces. If specific namespaces are being dumped, dump just those
1730  * namespaces. Otherwise, dump all non-system namespaces.
1731  */
1732  if (table_include_oids.head != NULL)
1733  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1734  else if (schema_include_oids.head != NULL)
1735  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1737  nsinfo->dobj.catId.oid) ?
1739  else if (fout->remoteVersion >= 90600 &&
1740  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1741  {
1742  /*
1743  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1744  * they are interesting (and not the original ACLs which were set at
1745  * initdb time, see pg_init_privs).
1746  */
1747  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1748  }
1749  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1750  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1751  {
1752  /* Other system schemas don't get dumped */
1753  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1754  }
1755  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1756  {
1757  /*
1758  * The public schema is a strange beast that sits in a sort of
1759  * no-mans-land between being a system object and a user object.
1760  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1761  * a comment and an indication of ownership. If the owner is the
1762  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1763  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1764  */
1765  nsinfo->create = false;
1766  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1767  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1768  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1770 
1771  /*
1772  * Also, make like it has a comment even if it doesn't; this is so
1773  * that we'll emit a command to drop the comment, if appropriate.
1774  * (Without this, we'd not call dumpCommentExtended for it.)
1775  */
1777  }
1778  else
1779  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1780 
1781  /*
1782  * In any case, a namespace can be excluded by an exclusion switch
1783  */
1784  if (nsinfo->dobj.dump_contains &&
1786  nsinfo->dobj.catId.oid))
1787  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1788 
1789  /*
1790  * If the schema belongs to an extension, allow extension membership to
1791  * override the dump decision for the schema itself. However, this does
1792  * not change dump_contains, so this won't change what we do with objects
1793  * within the schema. (If they belong to the extension, they'll get
1794  * suppressed by it, otherwise not.)
1795  */
1796  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1797 }
1798 
1799 /*
1800  * selectDumpableTable: policy-setting subroutine
1801  * Mark a table as to be dumped or not
1802  */
1803 static void
1805 {
1806  if (checkExtensionMembership(&tbinfo->dobj, fout))
1807  return; /* extension membership overrides all else */
1808 
1809  /*
1810  * If specific tables are being dumped, dump just those tables; else, dump
1811  * according to the parent namespace's dump flag.
1812  */
1813  if (table_include_oids.head != NULL)
1815  tbinfo->dobj.catId.oid) ?
1817  else
1818  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1819 
1820  /*
1821  * In any case, a table can be excluded by an exclusion switch
1822  */
1823  if (tbinfo->dobj.dump &&
1825  tbinfo->dobj.catId.oid))
1826  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1827 }
1828 
1829 /*
1830  * selectDumpableType: policy-setting subroutine
1831  * Mark a type as to be dumped or not
1832  *
1833  * If it's a table's rowtype or an autogenerated array type, we also apply a
1834  * special type code to facilitate sorting into the desired order. (We don't
1835  * want to consider those to be ordinary types because that would bring tables
1836  * up into the datatype part of the dump order.) We still set the object's
1837  * dump flag; that's not going to cause the dummy type to be dumped, but we
1838  * need it so that casts involving such types will be dumped correctly -- see
1839  * dumpCast. This means the flag should be set the same as for the underlying
1840  * object (the table or base type).
1841  */
1842 static void
1844 {
1845  /* skip complex types, except for standalone composite types */
1846  if (OidIsValid(tyinfo->typrelid) &&
1847  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1848  {
1849  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1850 
1851  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1852  if (tytable != NULL)
1853  tyinfo->dobj.dump = tytable->dobj.dump;
1854  else
1855  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1856  return;
1857  }
1858 
1859  /* skip auto-generated array types */
1860  if (tyinfo->isArray || tyinfo->isMultirange)
1861  {
1862  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1863 
1864  /*
1865  * Fall through to set the dump flag; we assume that the subsequent
1866  * rules will do the same thing as they would for the array's base
1867  * type. (We cannot reliably look up the base type here, since
1868  * getTypes may not have processed it yet.)
1869  */
1870  }
1871 
1872  if (checkExtensionMembership(&tyinfo->dobj, fout))
1873  return; /* extension membership overrides all else */
1874 
1875  /* Dump based on if the contents of the namespace are being dumped */
1876  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1877 }
1878 
1879 /*
1880  * selectDumpableDefaultACL: policy-setting subroutine
1881  * Mark a default ACL as to be dumped or not
1882  *
1883  * For per-schema default ACLs, dump if the schema is to be dumped.
1884  * Otherwise dump if we are dumping "everything". Note that dataOnly
1885  * and aclsSkip are checked separately.
1886  */
1887 static void
1889 {
1890  /* Default ACLs can't be extension members */
1891 
1892  if (dinfo->dobj.namespace)
1893  /* default ACLs are considered part of the namespace */
1894  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1895  else
1896  dinfo->dobj.dump = dopt->include_everything ?
1898 }
1899 
1900 /*
1901  * selectDumpableCast: policy-setting subroutine
1902  * Mark a cast as to be dumped or not
1903  *
1904  * Casts do not belong to any particular namespace (since they haven't got
1905  * names), nor do they have identifiable owners. To distinguish user-defined
1906  * casts from built-in ones, we must resort to checking whether the cast's
1907  * OID is in the range reserved for initdb.
1908  */
1909 static void
1911 {
1912  if (checkExtensionMembership(&cast->dobj, fout))
1913  return; /* extension membership overrides all else */
1914 
1915  /*
1916  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1917  * support ACLs currently.
1918  */
1919  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1920  cast->dobj.dump = DUMP_COMPONENT_NONE;
1921  else
1922  cast->dobj.dump = fout->dopt->include_everything ?
1924 }
1925 
1926 /*
1927  * selectDumpableProcLang: policy-setting subroutine
1928  * Mark a procedural language as to be dumped or not
1929  *
1930  * Procedural languages do not belong to any particular namespace. To
1931  * identify built-in languages, we must resort to checking whether the
1932  * language's OID is in the range reserved for initdb.
1933  */
1934 static void
1936 {
1937  if (checkExtensionMembership(&plang->dobj, fout))
1938  return; /* extension membership overrides all else */
1939 
1940  /*
1941  * Only include procedural languages when we are dumping everything.
1942  *
1943  * For from-initdb procedural languages, only include ACLs, as we do for
1944  * the pg_catalog namespace. We need this because procedural languages do
1945  * not live in any namespace.
1946  */
1947  if (!fout->dopt->include_everything)
1948  plang->dobj.dump = DUMP_COMPONENT_NONE;
1949  else
1950  {
1951  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1952  plang->dobj.dump = fout->remoteVersion < 90600 ?
1954  else
1955  plang->dobj.dump = DUMP_COMPONENT_ALL;
1956  }
1957 }
1958 
1959 /*
1960  * selectDumpableAccessMethod: policy-setting subroutine
1961  * Mark an access method as to be dumped or not
1962  *
1963  * Access methods do not belong to any particular namespace. To identify
1964  * built-in access methods, we must resort to checking whether the
1965  * method's OID is in the range reserved for initdb.
1966  */
1967 static void
1969 {
1970  if (checkExtensionMembership(&method->dobj, fout))
1971  return; /* extension membership overrides all else */
1972 
1973  /*
1974  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1975  * they do not support ACLs currently.
1976  */
1977  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1978  method->dobj.dump = DUMP_COMPONENT_NONE;
1979  else
1980  method->dobj.dump = fout->dopt->include_everything ?
1982 }
1983 
1984 /*
1985  * selectDumpableExtension: policy-setting subroutine
1986  * Mark an extension as to be dumped or not
1987  *
1988  * Built-in extensions should be skipped except for checking ACLs, since we
1989  * assume those will already be installed in the target database. We identify
1990  * such extensions by their having OIDs in the range reserved for initdb.
1991  * We dump all user-added extensions by default. No extensions are dumped
1992  * if include_everything is false (i.e., a --schema or --table switch was
1993  * given), except if --extension specifies a list of extensions to dump.
1994  */
1995 static void
1997 {
1998  /*
1999  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
2000  * change permissions on their member objects, if they wish to, and have
2001  * those changes preserved.
2002  */
2003  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2004  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
2005  else
2006  {
2007  /* check if there is a list of extensions to dump */
2008  if (extension_include_oids.head != NULL)
2009  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2011  extinfo->dobj.catId.oid) ?
2013  else
2014  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2015  dopt->include_everything ?
2017  }
2018 }
2019 
2020 /*
2021  * selectDumpablePublicationObject: policy-setting subroutine
2022  * Mark a publication object as to be dumped or not
2023  *
2024  * A publication can have schemas and tables which have schemas, but those are
2025  * ignored in decision making, because publications are only dumped when we are
2026  * dumping everything.
2027  */
2028 static void
2030 {
2031  if (checkExtensionMembership(dobj, fout))
2032  return; /* extension membership overrides all else */
2033 
2034  dobj->dump = fout->dopt->include_everything ?
2036 }
2037 
2038 /*
2039  * selectDumpableObject: policy-setting subroutine
2040  * Mark a generic dumpable object as to be dumped or not
2041  *
2042  * Use this only for object types without a special-case routine above.
2043  */
2044 static void
2046 {
2047  if (checkExtensionMembership(dobj, fout))
2048  return; /* extension membership overrides all else */
2049 
2050  /*
2051  * Default policy is to dump if parent namespace is dumpable, or for
2052  * non-namespace-associated items, dump if we're dumping "everything".
2053  */
2054  if (dobj->namespace)
2055  dobj->dump = dobj->namespace->dobj.dump_contains;
2056  else
2057  dobj->dump = fout->dopt->include_everything ?
2059 }
2060 
2061 /*
2062  * Dump a table's contents for loading using the COPY command
2063  * - this routine is called by the Archiver when it wants the table
2064  * to be dumped.
2065  */
2066 static int
2067 dumpTableData_copy(Archive *fout, const void *dcontext)
2068 {
2069  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2070  TableInfo *tbinfo = tdinfo->tdtable;
2071  const char *classname = tbinfo->dobj.name;
2073 
2074  /*
2075  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2076  * which uses it already.
2077  */
2078  PQExpBuffer clistBuf = createPQExpBuffer();
2079  PGconn *conn = GetConnection(fout);
2080  PGresult *res;
2081  int ret;
2082  char *copybuf;
2083  const char *column_list;
2084 
2085  pg_log_info("dumping contents of table \"%s.%s\"",
2086  tbinfo->dobj.namespace->dobj.name, classname);
2087 
2088  /*
2089  * Specify the column list explicitly so that we have no possibility of
2090  * retrieving data in the wrong column order. (The default column
2091  * ordering of COPY will not be what we want in certain corner cases
2092  * involving ADD COLUMN and inheritance.)
2093  */
2094  column_list = fmtCopyColumnList(tbinfo, clistBuf);
2095 
2096  /*
2097  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2098  * a filter condition was specified. For other cases a simple COPY
2099  * suffices.
2100  */
2101  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2102  {
2103  appendPQExpBufferStr(q, "COPY (SELECT ");
2104  /* klugery to get rid of parens in column list */
2105  if (strlen(column_list) > 2)
2106  {
2107  appendPQExpBufferStr(q, column_list + 1);
2108  q->data[q->len - 1] = ' ';
2109  }
2110  else
2111  appendPQExpBufferStr(q, "* ");
2112 
2113  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2114  fmtQualifiedDumpable(tbinfo),
2115  tdinfo->filtercond ? tdinfo->filtercond : "");
2116  }
2117  else
2118  {
2119  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2120  fmtQualifiedDumpable(tbinfo),
2121  column_list);
2122  }
2123  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2124  PQclear(res);
2125  destroyPQExpBuffer(clistBuf);
2126 
2127  for (;;)
2128  {
2129  ret = PQgetCopyData(conn, &copybuf, 0);
2130 
2131  if (ret < 0)
2132  break; /* done or error */
2133 
2134  if (copybuf)
2135  {
2136  WriteData(fout, copybuf, ret);
2137  PQfreemem(copybuf);
2138  }
2139 
2140  /* ----------
2141  * THROTTLE:
2142  *
2143  * There was considerable discussion in late July, 2000 regarding
2144  * slowing down pg_dump when backing up large tables. Users with both
2145  * slow & fast (multi-processor) machines experienced performance
2146  * degradation when doing a backup.
2147  *
2148  * Initial attempts based on sleeping for a number of ms for each ms
2149  * of work were deemed too complex, then a simple 'sleep in each loop'
2150  * implementation was suggested. The latter failed because the loop
2151  * was too tight. Finally, the following was implemented:
2152  *
2153  * If throttle is non-zero, then
2154  * See how long since the last sleep.
2155  * Work out how long to sleep (based on ratio).
2156  * If sleep is more than 100ms, then
2157  * sleep
2158  * reset timer
2159  * EndIf
2160  * EndIf
2161  *
2162  * where the throttle value was the number of ms to sleep per ms of
2163  * work. The calculation was done in each loop.
2164  *
2165  * Most of the hard work is done in the backend, and this solution
2166  * still did not work particularly well: on slow machines, the ratio
2167  * was 50:1, and on medium paced machines, 1:1, and on fast
2168  * multi-processor machines, it had little or no effect, for reasons
2169  * that were unclear.
2170  *
2171  * Further discussion ensued, and the proposal was dropped.
2172  *
2173  * For those people who want this feature, it can be implemented using
2174  * gettimeofday in each loop, calculating the time since last sleep,
2175  * multiplying that by the sleep ratio, then if the result is more
2176  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2177  * function to sleep for a subsecond period ie.
2178  *
2179  * select(0, NULL, NULL, NULL, &tvi);
2180  *
2181  * This will return after the interval specified in the structure tvi.
2182  * Finally, call gettimeofday again to save the 'last sleep time'.
2183  * ----------
2184  */
2185  }
2186  archprintf(fout, "\\.\n\n\n");
2187 
2188  if (ret == -2)
2189  {
2190  /* copy data transfer failed */
2191  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2192  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2193  pg_log_error_detail("Command was: %s", q->data);
2194  exit_nicely(1);
2195  }
2196 
2197  /* Check command status and return to normal libpq state */
2198  res = PQgetResult(conn);
2200  {
2201  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2202  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2203  pg_log_error_detail("Command was: %s", q->data);
2204  exit_nicely(1);
2205  }
2206  PQclear(res);
2207 
2208  /* Do this to ensure we've pumped libpq back to idle state */
2209  if (PQgetResult(conn) != NULL)
2210  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2211  classname);
2212 
2213  destroyPQExpBuffer(q);
2214  return 1;
2215 }
2216 
2217 /*
2218  * Dump table data using INSERT commands.
2219  *
2220  * Caution: when we restore from an archive file direct to database, the
2221  * INSERT commands emitted by this function have to be parsed by
2222  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2223  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2224  */
2225 static int
2226 dumpTableData_insert(Archive *fout, const void *dcontext)
2227 {
2228  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2229  TableInfo *tbinfo = tdinfo->tdtable;
2230  DumpOptions *dopt = fout->dopt;
2232  PQExpBuffer insertStmt = NULL;
2233  char *attgenerated;
2234  PGresult *res;
2235  int nfields,
2236  i;
2237  int rows_per_statement = dopt->dump_inserts;
2238  int rows_this_statement = 0;
2239 
2240  /*
2241  * If we're going to emit INSERTs with column names, the most efficient
2242  * way to deal with generated columns is to exclude them entirely. For
2243  * INSERTs without column names, we have to emit DEFAULT rather than the
2244  * actual column value --- but we can save a few cycles by fetching nulls
2245  * rather than the uninteresting-to-us value.
2246  */
2247  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2248  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2249  nfields = 0;
2250  for (i = 0; i < tbinfo->numatts; i++)
2251  {
2252  if (tbinfo->attisdropped[i])
2253  continue;
2254  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2255  continue;
2256  if (nfields > 0)
2257  appendPQExpBufferStr(q, ", ");
2258  if (tbinfo->attgenerated[i])
2259  appendPQExpBufferStr(q, "NULL");
2260  else
2261  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2262  attgenerated[nfields] = tbinfo->attgenerated[i];
2263  nfields++;
2264  }
2265  /* Servers before 9.4 will complain about zero-column SELECT */
2266  if (nfields == 0)
2267  appendPQExpBufferStr(q, "NULL");
2268  appendPQExpBuffer(q, " FROM ONLY %s",
2269  fmtQualifiedDumpable(tbinfo));
2270  if (tdinfo->filtercond)
2271  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2272 
2273  ExecuteSqlStatement(fout, q->data);
2274 
2275  while (1)
2276  {
2277  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2278  PGRES_TUPLES_OK);
2279 
2280  /* cross-check field count, allowing for dummy NULL if any */
2281  if (nfields != PQnfields(res) &&
2282  !(nfields == 0 && PQnfields(res) == 1))
2283  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2284  tbinfo->dobj.name);
2285 
2286  /*
2287  * First time through, we build as much of the INSERT statement as
2288  * possible in "insertStmt", which we can then just print for each
2289  * statement. If the table happens to have zero dumpable columns then
2290  * this will be a complete statement, otherwise it will end in
2291  * "VALUES" and be ready to have the row's column values printed.
2292  */
2293  if (insertStmt == NULL)
2294  {
2295  TableInfo *targettab;
2296 
2297  insertStmt = createPQExpBuffer();
2298 
2299  /*
2300  * When load-via-partition-root is set or forced, get the root
2301  * table name for the partition table, so that we can reload data
2302  * through the root table.
2303  */
2304  if (tbinfo->ispartition &&
2305  (dopt->load_via_partition_root ||
2306  forcePartitionRootLoad(tbinfo)))
2307  targettab = getRootTableInfo(tbinfo);
2308  else
2309  targettab = tbinfo;
2310 
2311  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2312  fmtQualifiedDumpable(targettab));
2313 
2314  /* corner case for zero-column table */
2315  if (nfields == 0)
2316  {
2317  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2318  }
2319  else
2320  {
2321  /* append the list of column names if required */
2322  if (dopt->column_inserts)
2323  {
2324  appendPQExpBufferChar(insertStmt, '(');
2325  for (int field = 0; field < nfields; field++)
2326  {
2327  if (field > 0)
2328  appendPQExpBufferStr(insertStmt, ", ");
2329  appendPQExpBufferStr(insertStmt,
2330  fmtId(PQfname(res, field)));
2331  }
2332  appendPQExpBufferStr(insertStmt, ") ");
2333  }
2334 
2335  if (tbinfo->needs_override)
2336  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2337 
2338  appendPQExpBufferStr(insertStmt, "VALUES");
2339  }
2340  }
2341 
2342  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2343  {
2344  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2345  if (rows_this_statement == 0)
2346  archputs(insertStmt->data, fout);
2347 
2348  /*
2349  * If it is zero-column table then we've already written the
2350  * complete statement, which will mean we've disobeyed
2351  * --rows-per-insert when it's set greater than 1. We do support
2352  * a way to make this multi-row with: SELECT UNION ALL SELECT
2353  * UNION ALL ... but that's non-standard so we should avoid it
2354  * given that using INSERTs is mostly only ever needed for
2355  * cross-database exports.
2356  */
2357  if (nfields == 0)
2358  continue;
2359 
2360  /* Emit a row heading */
2361  if (rows_per_statement == 1)
2362  archputs(" (", fout);
2363  else if (rows_this_statement > 0)
2364  archputs(",\n\t(", fout);
2365  else
2366  archputs("\n\t(", fout);
2367 
2368  for (int field = 0; field < nfields; field++)
2369  {
2370  if (field > 0)
2371  archputs(", ", fout);
2372  if (attgenerated[field])
2373  {
2374  archputs("DEFAULT", fout);
2375  continue;
2376  }
2377  if (PQgetisnull(res, tuple, field))
2378  {
2379  archputs("NULL", fout);
2380  continue;
2381  }
2382 
2383  /* XXX This code is partially duplicated in ruleutils.c */
2384  switch (PQftype(res, field))
2385  {
2386  case INT2OID:
2387  case INT4OID:
2388  case INT8OID:
2389  case OIDOID:
2390  case FLOAT4OID:
2391  case FLOAT8OID:
2392  case NUMERICOID:
2393  {
2394  /*
2395  * These types are printed without quotes unless
2396  * they contain values that aren't accepted by the
2397  * scanner unquoted (e.g., 'NaN'). Note that
2398  * strtod() and friends might accept NaN, so we
2399  * can't use that to test.
2400  *
2401  * In reality we only need to defend against
2402  * infinity and NaN, so we need not get too crazy
2403  * about pattern matching here.
2404  */
2405  const char *s = PQgetvalue(res, tuple, field);
2406 
2407  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2408  archputs(s, fout);
2409  else
2410  archprintf(fout, "'%s'", s);
2411  }
2412  break;
2413 
2414  case BITOID:
2415  case VARBITOID:
2416  archprintf(fout, "B'%s'",
2417  PQgetvalue(res, tuple, field));
2418  break;
2419 
2420  case BOOLOID:
2421  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2422  archputs("true", fout);
2423  else
2424  archputs("false", fout);
2425  break;
2426 
2427  default:
2428  /* All other types are printed as string literals. */
2429  resetPQExpBuffer(q);
2431  PQgetvalue(res, tuple, field),
2432  fout);
2433  archputs(q->data, fout);
2434  break;
2435  }
2436  }
2437 
2438  /* Terminate the row ... */
2439  archputs(")", fout);
2440 
2441  /* ... and the statement, if the target no. of rows is reached */
2442  if (++rows_this_statement >= rows_per_statement)
2443  {
2444  if (dopt->do_nothing)
2445  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2446  else
2447  archputs(";\n", fout);
2448  /* Reset the row counter */
2449  rows_this_statement = 0;
2450  }
2451  }
2452 
2453  if (PQntuples(res) <= 0)
2454  {
2455  PQclear(res);
2456  break;
2457  }
2458  PQclear(res);
2459  }
2460 
2461  /* Terminate any statements that didn't make the row count. */
2462  if (rows_this_statement > 0)
2463  {
2464  if (dopt->do_nothing)
2465  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2466  else
2467  archputs(";\n", fout);
2468  }
2469 
2470  archputs("\n\n", fout);
2471 
2472  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2473 
2474  destroyPQExpBuffer(q);
2475  if (insertStmt != NULL)
2476  destroyPQExpBuffer(insertStmt);
2477  free(attgenerated);
2478 
2479  return 1;
2480 }
2481 
2482 /*
2483  * getRootTableInfo:
2484  * get the root TableInfo for the given partition table.
2485  */
2486 static TableInfo *
2488 {
2489  TableInfo *parentTbinfo;
2490 
2491  Assert(tbinfo->ispartition);
2492  Assert(tbinfo->numParents == 1);
2493 
2494  parentTbinfo = tbinfo->parents[0];
2495  while (parentTbinfo->ispartition)
2496  {
2497  Assert(parentTbinfo->numParents == 1);
2498  parentTbinfo = parentTbinfo->parents[0];
2499  }
2500 
2501  return parentTbinfo;
2502 }
2503 
2504 /*
2505  * forcePartitionRootLoad
2506  * Check if we must force load_via_partition_root for this partition.
2507  *
2508  * This is required if any level of ancestral partitioned table has an
2509  * unsafe partitioning scheme.
2510  */
2511 static bool
2513 {
2514  TableInfo *parentTbinfo;
2515 
2516  Assert(tbinfo->ispartition);
2517  Assert(tbinfo->numParents == 1);
2518 
2519  parentTbinfo = tbinfo->parents[0];
2520  if (parentTbinfo->unsafe_partitions)
2521  return true;
2522  while (parentTbinfo->ispartition)
2523  {
2524  Assert(parentTbinfo->numParents == 1);
2525  parentTbinfo = parentTbinfo->parents[0];
2526  if (parentTbinfo->unsafe_partitions)
2527  return true;
2528  }
2529 
2530  return false;
2531 }
2532 
2533 /*
2534  * dumpTableData -
2535  * dump the contents of a single table
2536  *
2537  * Actually, this just makes an ArchiveEntry for the table contents.
2538  */
2539 static void
2540 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2541 {
2542  DumpOptions *dopt = fout->dopt;
2543  TableInfo *tbinfo = tdinfo->tdtable;
2544  PQExpBuffer copyBuf = createPQExpBuffer();
2545  PQExpBuffer clistBuf = createPQExpBuffer();
2546  DataDumperPtr dumpFn;
2547  char *tdDefn = NULL;
2548  char *copyStmt;
2549  const char *copyFrom;
2550 
2551  /* We had better have loaded per-column details about this table */
2552  Assert(tbinfo->interesting);
2553 
2554  /*
2555  * When load-via-partition-root is set or forced, get the root table name
2556  * for the partition table, so that we can reload data through the root
2557  * table. Then construct a comment to be inserted into the TOC entry's
2558  * defn field, so that such cases can be identified reliably.
2559  */
2560  if (tbinfo->ispartition &&
2561  (dopt->load_via_partition_root ||
2562  forcePartitionRootLoad(tbinfo)))
2563  {
2564  TableInfo *parentTbinfo;
2565 
2566  parentTbinfo = getRootTableInfo(tbinfo);
2567  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2568  printfPQExpBuffer(copyBuf, "-- load via partition root %s",
2569  copyFrom);
2570  tdDefn = pg_strdup(copyBuf->data);
2571  }
2572  else
2573  copyFrom = fmtQualifiedDumpable(tbinfo);
2574 
2575  if (dopt->dump_inserts == 0)
2576  {
2577  /* Dump/restore using COPY */
2578  dumpFn = dumpTableData_copy;
2579  /* must use 2 steps here 'cause fmtId is nonreentrant */
2580  printfPQExpBuffer(copyBuf, "COPY %s ",
2581  copyFrom);
2582  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2583  fmtCopyColumnList(tbinfo, clistBuf));
2584  copyStmt = copyBuf->data;
2585  }
2586  else
2587  {
2588  /* Restore using INSERT */
2589  dumpFn = dumpTableData_insert;
2590  copyStmt = NULL;
2591  }
2592 
2593  /*
2594  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2595  * dependency on its table as "special" and pass it to ArchiveEntry now.
2596  * See comments for BuildArchiveDependencies.
2597  */
2598  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2599  {
2600  TocEntry *te;
2601 
2602  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2603  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2604  .namespace = tbinfo->dobj.namespace->dobj.name,
2605  .owner = tbinfo->rolname,
2606  .description = "TABLE DATA",
2607  .section = SECTION_DATA,
2608  .createStmt = tdDefn,
2609  .copyStmt = copyStmt,
2610  .deps = &(tbinfo->dobj.dumpId),
2611  .nDeps = 1,
2612  .dumpFn = dumpFn,
2613  .dumpArg = tdinfo));
2614 
2615  /*
2616  * Set the TocEntry's dataLength in case we are doing a parallel dump
2617  * and want to order dump jobs by table size. We choose to measure
2618  * dataLength in table pages (including TOAST pages) during dump, so
2619  * no scaling is needed.
2620  *
2621  * However, relpages is declared as "integer" in pg_class, and hence
2622  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2623  * Cast so that we get the right interpretation of table sizes
2624  * exceeding INT_MAX pages.
2625  */
2626  te->dataLength = (BlockNumber) tbinfo->relpages;
2627  te->dataLength += (BlockNumber) tbinfo->toastpages;
2628 
2629  /*
2630  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2631  * and instead we'd better worry about integer overflow. Clamp to
2632  * INT_MAX if the correct result exceeds that.
2633  */
2634  if (sizeof(te->dataLength) == 4 &&
2635  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2636  te->dataLength < 0))
2637  te->dataLength = INT_MAX;
2638  }
2639 
2640  destroyPQExpBuffer(copyBuf);
2641  destroyPQExpBuffer(clistBuf);
2642 }
2643 
2644 /*
2645  * refreshMatViewData -
2646  * load or refresh the contents of a single materialized view
2647  *
2648  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2649  * statement.
2650  */
2651 static void
2653 {
2654  TableInfo *tbinfo = tdinfo->tdtable;
2655  PQExpBuffer q;
2656 
2657  /* If the materialized view is not flagged as populated, skip this. */
2658  if (!tbinfo->relispopulated)
2659  return;
2660 
2661  q = createPQExpBuffer();
2662 
2663  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2664  fmtQualifiedDumpable(tbinfo));
2665 
2666  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2667  ArchiveEntry(fout,
2668  tdinfo->dobj.catId, /* catalog ID */
2669  tdinfo->dobj.dumpId, /* dump ID */
2670  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2671  .namespace = tbinfo->dobj.namespace->dobj.name,
2672  .owner = tbinfo->rolname,
2673  .description = "MATERIALIZED VIEW DATA",
2674  .section = SECTION_POST_DATA,
2675  .createStmt = q->data,
2676  .deps = tdinfo->dobj.dependencies,
2677  .nDeps = tdinfo->dobj.nDeps));
2678 
2679  destroyPQExpBuffer(q);
2680 }
2681 
2682 /*
2683  * getTableData -
2684  * set up dumpable objects representing the contents of tables
2685  */
2686 static void
2687 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2688 {
2689  int i;
2690 
2691  for (i = 0; i < numTables; i++)
2692  {
2693  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2694  (!relkind || tblinfo[i].relkind == relkind))
2695  makeTableDataInfo(dopt, &(tblinfo[i]));
2696  }
2697 }
2698 
2699 /*
2700  * Make a dumpable object for the data of this specific table
2701  *
2702  * Note: we make a TableDataInfo if and only if we are going to dump the
2703  * table data; the "dump" field in such objects isn't very interesting.
2704  */
2705 static void
2707 {
2708  TableDataInfo *tdinfo;
2709 
2710  /*
2711  * Nothing to do if we already decided to dump the table. This will
2712  * happen for "config" tables.
2713  */
2714  if (tbinfo->dataObj != NULL)
2715  return;
2716 
2717  /* Skip VIEWs (no data to dump) */
2718  if (tbinfo->relkind == RELKIND_VIEW)
2719  return;
2720  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2721  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2724  tbinfo->foreign_server)))
2725  return;
2726  /* Skip partitioned tables (data in partitions) */
2727  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2728  return;
2729 
2730  /* Don't dump data in unlogged tables, if so requested */
2731  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2732  dopt->no_unlogged_table_data)
2733  return;
2734 
2735  /* Check that the data is not explicitly excluded */
2737  tbinfo->dobj.catId.oid))
2738  return;
2739 
2740  /* OK, let's dump it */
2741  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2742 
2743  if (tbinfo->relkind == RELKIND_MATVIEW)
2744  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2745  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2746  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2747  else
2748  tdinfo->dobj.objType = DO_TABLE_DATA;
2749 
2750  /*
2751  * Note: use tableoid 0 so that this object won't be mistaken for
2752  * something that pg_depend entries apply to.
2753  */
2754  tdinfo->dobj.catId.tableoid = 0;
2755  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2756  AssignDumpId(&tdinfo->dobj);
2757  tdinfo->dobj.name = tbinfo->dobj.name;
2758  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2759  tdinfo->tdtable = tbinfo;
2760  tdinfo->filtercond = NULL; /* might get set later */
2761  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2762 
2763  /* A TableDataInfo contains data, of course */
2764  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2765 
2766  tbinfo->dataObj = tdinfo;
2767 
2768  /* Make sure that we'll collect per-column info for this table. */
2769  tbinfo->interesting = true;
2770 }
2771 
2772 /*
2773  * The refresh for a materialized view must be dependent on the refresh for
2774  * any materialized view that this one is dependent on.
2775  *
2776  * This must be called after all the objects are created, but before they are
2777  * sorted.
2778  */
2779 static void
2781 {
2782  PQExpBuffer query;
2783  PGresult *res;
2784  int ntups,
2785  i;
2786  int i_classid,
2787  i_objid,
2788  i_refobjid;
2789 
2790  /* No Mat Views before 9.3. */
2791  if (fout->remoteVersion < 90300)
2792  return;
2793 
2794  query = createPQExpBuffer();
2795 
2796  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2797  "( "
2798  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2799  "FROM pg_depend d1 "
2800  "JOIN pg_class c1 ON c1.oid = d1.objid "
2801  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2802  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2803  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2804  "AND d2.objid = r1.oid "
2805  "AND d2.refobjid <> d1.objid "
2806  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2807  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2808  CppAsString2(RELKIND_VIEW) ") "
2809  "WHERE d1.classid = 'pg_class'::regclass "
2810  "UNION "
2811  "SELECT w.objid, d3.refobjid, c3.relkind "
2812  "FROM w "
2813  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2814  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2815  "AND d3.objid = r3.oid "
2816  "AND d3.refobjid <> w.refobjid "
2817  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2818  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2819  CppAsString2(RELKIND_VIEW) ") "
2820  ") "
2821  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2822  "FROM w "
2823  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2824 
2825  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2826 
2827  ntups = PQntuples(res);
2828 
2829  i_classid = PQfnumber(res, "classid");
2830  i_objid = PQfnumber(res, "objid");
2831  i_refobjid = PQfnumber(res, "refobjid");
2832 
2833  for (i = 0; i < ntups; i++)
2834  {
2835  CatalogId objId;
2836  CatalogId refobjId;
2837  DumpableObject *dobj;
2838  DumpableObject *refdobj;
2839  TableInfo *tbinfo;
2840  TableInfo *reftbinfo;
2841 
2842  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2843  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2844  refobjId.tableoid = objId.tableoid;
2845  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2846 
2847  dobj = findObjectByCatalogId(objId);
2848  if (dobj == NULL)
2849  continue;
2850 
2851  Assert(dobj->objType == DO_TABLE);
2852  tbinfo = (TableInfo *) dobj;
2853  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2854  dobj = (DumpableObject *) tbinfo->dataObj;
2855  if (dobj == NULL)
2856  continue;
2857  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2858 
2859  refdobj = findObjectByCatalogId(refobjId);
2860  if (refdobj == NULL)
2861  continue;
2862 
2863  Assert(refdobj->objType == DO_TABLE);
2864  reftbinfo = (TableInfo *) refdobj;
2865  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2866  refdobj = (DumpableObject *) reftbinfo->dataObj;
2867  if (refdobj == NULL)
2868  continue;
2869  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2870 
2871  addObjectDependency(dobj, refdobj->dumpId);
2872 
2873  if (!reftbinfo->relispopulated)
2874  tbinfo->relispopulated = false;
2875  }
2876 
2877  PQclear(res);
2878 
2879  destroyPQExpBuffer(query);
2880 }
2881 
2882 /*
2883  * getTableDataFKConstraints -
2884  * add dump-order dependencies reflecting foreign key constraints
2885  *
2886  * This code is executed only in a data-only dump --- in schema+data dumps
2887  * we handle foreign key issues by not creating the FK constraints until
2888  * after the data is loaded. In a data-only dump, however, we want to
2889  * order the table data objects in such a way that a table's referenced
2890  * tables are restored first. (In the presence of circular references or
2891  * self-references this may be impossible; we'll detect and complain about
2892  * that during the dependency sorting step.)
2893  */
2894 static void
2896 {
2897  DumpableObject **dobjs;
2898  int numObjs;
2899  int i;
2900 
2901  /* Search through all the dumpable objects for FK constraints */
2902  getDumpableObjects(&dobjs, &numObjs);
2903  for (i = 0; i < numObjs; i++)
2904  {
2905  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2906  {
2907  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2908  TableInfo *ftable;
2909 
2910  /* Not interesting unless both tables are to be dumped */
2911  if (cinfo->contable == NULL ||
2912  cinfo->contable->dataObj == NULL)
2913  continue;
2914  ftable = findTableByOid(cinfo->confrelid);
2915  if (ftable == NULL ||
2916  ftable->dataObj == NULL)
2917  continue;
2918 
2919  /*
2920  * Okay, make referencing table's TABLE_DATA object depend on the
2921  * referenced table's TABLE_DATA object.
2922  */
2924  ftable->dataObj->dobj.dumpId);
2925  }
2926  }
2927  free(dobjs);
2928 }
2929 
2930 
2931 /*
2932  * dumpDatabase:
2933  * dump the database definition
2934  */
2935 static void
2937 {
2938  DumpOptions *dopt = fout->dopt;
2939  PQExpBuffer dbQry = createPQExpBuffer();
2940  PQExpBuffer delQry = createPQExpBuffer();
2941  PQExpBuffer creaQry = createPQExpBuffer();
2942  PQExpBuffer labelq = createPQExpBuffer();
2943  PGconn *conn = GetConnection(fout);
2944  PGresult *res;
2945  int i_tableoid,
2946  i_oid,
2947  i_datname,
2948  i_datdba,
2949  i_encoding,
2950  i_datlocprovider,
2951  i_collate,
2952  i_ctype,
2953  i_daticulocale,
2954  i_daticurules,
2955  i_frozenxid,
2956  i_minmxid,
2957  i_datacl,
2958  i_acldefault,
2959  i_datistemplate,
2960  i_datconnlimit,
2961  i_datcollversion,
2962  i_tablespace;
2963  CatalogId dbCatId;
2964  DumpId dbDumpId;
2965  DumpableAcl dbdacl;
2966  const char *datname,
2967  *dba,
2968  *encoding,
2969  *datlocprovider,
2970  *collate,
2971  *ctype,
2972  *iculocale,
2973  *icurules,
2974  *datistemplate,
2975  *datconnlimit,
2976  *tablespace;
2977  uint32 frozenxid,
2978  minmxid;
2979  char *qdatname;
2980 
2981  pg_log_info("saving database definition");
2982 
2983  /*
2984  * Fetch the database-level properties for this database.
2985  */
2986  appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
2987  "datdba, "
2988  "pg_encoding_to_char(encoding) AS encoding, "
2989  "datcollate, datctype, datfrozenxid, "
2990  "datacl, acldefault('d', datdba) AS acldefault, "
2991  "datistemplate, datconnlimit, ");
2992  if (fout->remoteVersion >= 90300)
2993  appendPQExpBufferStr(dbQry, "datminmxid, ");
2994  else
2995  appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
2996  if (fout->remoteVersion >= 150000)
2997  appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale, datcollversion, ");
2998  else
2999  appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS daticulocale, NULL AS datcollversion, ");
3000  if (fout->remoteVersion >= 160000)
3001  appendPQExpBufferStr(dbQry, "daticurules, ");
3002  else
3003  appendPQExpBufferStr(dbQry, "NULL AS daticurules, ");
3004  appendPQExpBufferStr(dbQry,
3005  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
3006  "shobj_description(oid, 'pg_database') AS description "
3007  "FROM pg_database "
3008  "WHERE datname = current_database()");
3009 
3010  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
3011 
3012  i_tableoid = PQfnumber(res, "tableoid");
3013  i_oid = PQfnumber(res, "oid");
3014  i_datname = PQfnumber(res, "datname");
3015  i_datdba = PQfnumber(res, "datdba");
3016  i_encoding = PQfnumber(res, "encoding");
3017  i_datlocprovider = PQfnumber(res, "datlocprovider");
3018  i_collate = PQfnumber(res, "datcollate");
3019  i_ctype = PQfnumber(res, "datctype");
3020  i_daticulocale = PQfnumber(res, "daticulocale");
3021  i_daticurules = PQfnumber(res, "daticurules");
3022  i_frozenxid = PQfnumber(res, "datfrozenxid");
3023  i_minmxid = PQfnumber(res, "datminmxid");
3024  i_datacl = PQfnumber(res, "datacl");
3025  i_acldefault = PQfnumber(res, "acldefault");
3026  i_datistemplate = PQfnumber(res, "datistemplate");
3027  i_datconnlimit = PQfnumber(res, "datconnlimit");
3028  i_datcollversion = PQfnumber(res, "datcollversion");
3029  i_tablespace = PQfnumber(res, "tablespace");
3030 
3031  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
3032  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3033  datname = PQgetvalue(res, 0, i_datname);
3034  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
3035  encoding = PQgetvalue(res, 0, i_encoding);
3036  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
3037  collate = PQgetvalue(res, 0, i_collate);
3038  ctype = PQgetvalue(res, 0, i_ctype);
3039  if (!PQgetisnull(res, 0, i_daticulocale))
3040  iculocale = PQgetvalue(res, 0, i_daticulocale);
3041  else
3042  iculocale = NULL;
3043  if (!PQgetisnull(res, 0, i_daticurules))
3044  icurules = PQgetvalue(res, 0, i_daticurules);
3045  else
3046  icurules = NULL;
3047  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3048  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3049  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3050  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3051  datistemplate = PQgetvalue(res, 0, i_datistemplate);
3052  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3053  tablespace = PQgetvalue(res, 0, i_tablespace);
3054 
3055  qdatname = pg_strdup(fmtId(datname));
3056 
3057  /*
3058  * Prepare the CREATE DATABASE command. We must specify OID (if we want
3059  * to preserve that), as well as the encoding, locale, and tablespace
3060  * since those can't be altered later. Other DB properties are left to
3061  * the DATABASE PROPERTIES entry, so that they can be applied after
3062  * reconnecting to the target DB.
3063  */
3064  if (dopt->binary_upgrade)
3065  {
3066  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0 OID = %u",
3067  qdatname, dbCatId.oid);
3068  }
3069  else
3070  {
3071  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3072  qdatname);
3073  }
3074  if (strlen(encoding) > 0)
3075  {
3076  appendPQExpBufferStr(creaQry, " ENCODING = ");
3077  appendStringLiteralAH(creaQry, encoding, fout);
3078  }
3079 
3080  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
3081  if (datlocprovider[0] == 'c')
3082  appendPQExpBufferStr(creaQry, "libc");
3083  else if (datlocprovider[0] == 'i')
3084  appendPQExpBufferStr(creaQry, "icu");
3085  else
3086  pg_fatal("unrecognized locale provider: %s",
3087  datlocprovider);
3088 
3089  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3090  {
3091  appendPQExpBufferStr(creaQry, " LOCALE = ");
3092  appendStringLiteralAH(creaQry, collate, fout);
3093  }
3094  else
3095  {
3096  if (strlen(collate) > 0)
3097  {
3098  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3099  appendStringLiteralAH(creaQry, collate, fout);
3100  }
3101  if (strlen(ctype) > 0)
3102  {
3103  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3104  appendStringLiteralAH(creaQry, ctype, fout);
3105  }
3106  }
3107  if (iculocale)
3108  {
3109  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
3110  appendStringLiteralAH(creaQry, iculocale, fout);
3111  }
3112  if (icurules)
3113  {
3114  appendPQExpBufferStr(creaQry, " ICU_RULES = ");
3115  appendStringLiteralAH(creaQry, icurules, fout);
3116  }
3117 
3118  /*
3119  * For binary upgrade, carry over the collation version. For normal
3120  * dump/restore, omit the version, so that it is computed upon restore.
3121  */
3122  if (dopt->binary_upgrade)
3123  {
3124  if (!PQgetisnull(res, 0, i_datcollversion))
3125  {
3126  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3127  appendStringLiteralAH(creaQry,
3128  PQgetvalue(res, 0, i_datcollversion),
3129  fout);
3130  }
3131  }
3132 
3133  /*
3134  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3135  * thing; the decision whether to specify a tablespace should be left till
3136  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3137  * label the DATABASE entry with the tablespace and let the normal
3138  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3139  * attention to default_tablespace, so that won't work.
3140  */
3141  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3142  !dopt->outputNoTablespaces)
3143  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3144  fmtId(tablespace));
3145  appendPQExpBufferStr(creaQry, ";\n");
3146 
3147  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3148  qdatname);
3149 
3150  dbDumpId = createDumpId();
3151 
3152  ArchiveEntry(fout,
3153  dbCatId, /* catalog ID */
3154  dbDumpId, /* dump ID */
3155  ARCHIVE_OPTS(.tag = datname,
3156  .owner = dba,
3157  .description = "DATABASE",
3158  .section = SECTION_PRE_DATA,
3159  .createStmt = creaQry->data,
3160  .dropStmt = delQry->data));
3161 
3162  /* Compute correct tag for archive entry */
3163  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3164 
3165  /* Dump DB comment if any */
3166  {
3167  /*
3168  * 8.2 and up keep comments on shared objects in a shared table, so we
3169  * cannot use the dumpComment() code used for other database objects.
3170  * Be careful that the ArchiveEntry parameters match that function.
3171  */
3172  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3173 
3174  if (comment && *comment && !dopt->no_comments)
3175  {
3176  resetPQExpBuffer(dbQry);
3177 
3178  /*
3179  * Generates warning when loaded into a differently-named
3180  * database.
3181  */
3182  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3183  appendStringLiteralAH(dbQry, comment, fout);
3184  appendPQExpBufferStr(dbQry, ";\n");
3185 
3187  ARCHIVE_OPTS(.tag = labelq->data,
3188  .owner = dba,
3189  .description = "COMMENT",
3190  .section = SECTION_NONE,
3191  .createStmt = dbQry->data,
3192  .deps = &dbDumpId,
3193  .nDeps = 1));
3194  }
3195  }
3196 
3197  /* Dump DB security label, if enabled */
3198  if (!dopt->no_security_labels)
3199  {
3200  PGresult *shres;
3201  PQExpBuffer seclabelQry;
3202 
3203  seclabelQry = createPQExpBuffer();
3204 
3205  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3206  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3207  resetPQExpBuffer(seclabelQry);
3208  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3209  if (seclabelQry->len > 0)
3211  ARCHIVE_OPTS(.tag = labelq->data,
3212  .owner = dba,
3213  .description = "SECURITY LABEL",
3214  .section = SECTION_NONE,
3215  .createStmt = seclabelQry->data,
3216  .deps = &dbDumpId,
3217  .nDeps = 1));
3218  destroyPQExpBuffer(seclabelQry);
3219  PQclear(shres);
3220  }
3221 
3222  /*
3223  * Dump ACL if any. Note that we do not support initial privileges
3224  * (pg_init_privs) on databases.
3225  */
3226  dbdacl.privtype = 0;
3227  dbdacl.initprivs = NULL;
3228 
3229  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3230  qdatname, NULL, NULL,
3231  dba, &dbdacl);
3232 
3233  /*
3234  * Now construct a DATABASE PROPERTIES archive entry to restore any
3235  * non-default database-level properties. (The reason this must be
3236  * separate is that we cannot put any additional commands into the TOC
3237  * entry that has CREATE DATABASE. pg_restore would execute such a group
3238  * in an implicit transaction block, and the backend won't allow CREATE
3239  * DATABASE in that context.)
3240  */
3241  resetPQExpBuffer(creaQry);
3242  resetPQExpBuffer(delQry);
3243 
3244  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3245  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3246  qdatname, datconnlimit);
3247 
3248  if (strcmp(datistemplate, "t") == 0)
3249  {
3250  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3251  qdatname);
3252 
3253  /*
3254  * The backend won't accept DROP DATABASE on a template database. We
3255  * can deal with that by removing the template marking before the DROP
3256  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3257  * since no such command is currently supported, fake it with a direct
3258  * UPDATE on pg_database.
3259  */
3260  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3261  "SET datistemplate = false WHERE datname = ");
3262  appendStringLiteralAH(delQry, datname, fout);
3263  appendPQExpBufferStr(delQry, ";\n");
3264  }
3265 
3266  /* Add database-specific SET options */
3267  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3268 
3269  /*
3270  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3271  * entry, too, for lack of a better place.
3272  */
3273  if (dopt->binary_upgrade)
3274  {
3275  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3276  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3277  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3278  "WHERE datname = ",
3279  frozenxid, minmxid);
3280  appendStringLiteralAH(creaQry, datname, fout);
3281  appendPQExpBufferStr(creaQry, ";\n");
3282  }
3283 
3284  if (creaQry->len > 0)
3286  ARCHIVE_OPTS(.tag = datname,
3287  .owner = dba,
3288  .description = "DATABASE PROPERTIES",
3289  .section = SECTION_PRE_DATA,
3290  .createStmt = creaQry->data,
3291  .dropStmt = delQry->data,
3292  .deps = &dbDumpId));
3293 
3294  /*
3295  * pg_largeobject comes from the old system intact, so set its
3296  * relfrozenxids, relminmxids and relfilenode.
3297  */
3298  if (dopt->binary_upgrade)
3299  {
3300  PGresult *lo_res;
3301  PQExpBuffer loFrozenQry = createPQExpBuffer();
3302  PQExpBuffer loOutQry = createPQExpBuffer();
3303  PQExpBuffer loHorizonQry = createPQExpBuffer();
3304  int ii_relfrozenxid,
3305  ii_relfilenode,
3306  ii_oid,
3307  ii_relminmxid;
3308 
3309  /*
3310  * pg_largeobject
3311  */
3312  if (fout->remoteVersion >= 90300)
3313  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3314  "FROM pg_catalog.pg_class\n"
3315  "WHERE oid IN (%u, %u);\n",
3316  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3317  else
3318  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3319  "FROM pg_catalog.pg_class\n"
3320  "WHERE oid IN (%u, %u);\n",
3321  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3322 
3323  lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3324 
3325  ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3326  ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3327  ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3328  ii_oid = PQfnumber(lo_res, "oid");
3329 
3330  appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3331  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3332  for (int i = 0; i < PQntuples(lo_res); ++i)
3333  {
3334  Oid oid;
3335  RelFileNumber relfilenumber;
3336 
3337  appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3338  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3339  "WHERE oid = %u;\n",
3340  atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3341  atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3342  atooid(PQgetvalue(lo_res, i, ii_oid)));
3343 
3344  oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3345  relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3346 
3347  if (oid == LargeObjectRelationId)
3348  appendPQExpBuffer(loOutQry,
3349  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3350  relfilenumber);
3351  else if (oid == LargeObjectLOidPNIndexId)
3352  appendPQExpBuffer(loOutQry,
3353  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3354  relfilenumber);
3355  }
3356 
3357  appendPQExpBufferStr(loOutQry,
3358  "TRUNCATE pg_catalog.pg_largeobject;\n");
3359  appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3360 
3362  ARCHIVE_OPTS(.tag = "pg_largeobject",
3363  .description = "pg_largeobject",
3364  .section = SECTION_PRE_DATA,
3365  .createStmt = loOutQry->data));
3366 
3367  PQclear(lo_res);
3368 
3369  destroyPQExpBuffer(loFrozenQry);
3370  destroyPQExpBuffer(loHorizonQry);
3371  destroyPQExpBuffer(loOutQry);
3372  }
3373 
3374  PQclear(res);
3375 
3376  free(qdatname);
3377  destroyPQExpBuffer(dbQry);
3378  destroyPQExpBuffer(delQry);
3379  destroyPQExpBuffer(creaQry);
3380  destroyPQExpBuffer(labelq);
3381 }
3382 
3383 /*
3384  * Collect any database-specific or role-and-database-specific SET options
3385  * for this database, and append them to outbuf.
3386  */
3387 static void
3389  const char *dbname, Oid dboid)
3390 {
3391  PGconn *conn = GetConnection(AH);
3393  PGresult *res;
3394 
3395  /* First collect database-specific options */
3396  printfPQExpBuffer(buf, "SELECT unnest(setconfig) FROM pg_db_role_setting "
3397  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3398  dboid);
3399 
3400  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3401 
3402  for (int i = 0; i < PQntuples(res); i++)
3404  "DATABASE", dbname, NULL, NULL,
3405  outbuf);
3406 
3407  PQclear(res);
3408 
3409  /* Now look for role-and-database-specific options */
3410  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3411  "FROM pg_db_role_setting s, pg_roles r "
3412  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3413  dboid);
3414 
3415  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3416 
3417  for (int i = 0; i < PQntuples(res); i++)
3419  "ROLE", PQgetvalue(res, i, 0),
3420  "DATABASE", dbname,
3421  outbuf);
3422 
3423  PQclear(res);
3424 
3426 }
3427 
3428 /*
3429  * dumpEncoding: put the correct encoding into the archive
3430  */
3431 static void
3433 {
3434  const char *encname = pg_encoding_to_char(AH->encoding);
3436 
3437  pg_log_info("saving encoding = %s", encname);
3438 
3439  appendPQExpBufferStr(qry, "SET client_encoding = ");
3440  appendStringLiteralAH(qry, encname, AH);
3441  appendPQExpBufferStr(qry, ";\n");
3442 
3444  ARCHIVE_OPTS(.tag = "ENCODING",
3445  .description = "ENCODING",
3446  .section = SECTION_PRE_DATA,
3447  .createStmt = qry->data));
3448 
3449  destroyPQExpBuffer(qry);
3450 }
3451 
3452 
3453 /*
3454  * dumpStdStrings: put the correct escape string behavior into the archive
3455  */
3456 static void
3458 {
3459  const char *stdstrings = AH->std_strings ? "on" : "off";
3461 
3462  pg_log_info("saving standard_conforming_strings = %s",
3463  stdstrings);
3464 
3465  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3466  stdstrings);
3467 
3469  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3470  .description = "STDSTRINGS",
3471  .section = SECTION_PRE_DATA,
3472  .createStmt = qry->data));
3473 
3474  destroyPQExpBuffer(qry);
3475 }
3476 
3477 /*
3478  * dumpSearchPath: record the active search_path in the archive
3479  */
3480 static void
3482 {
3484  PQExpBuffer path = createPQExpBuffer();
3485  PGresult *res;
3486  char **schemanames = NULL;
3487  int nschemanames = 0;
3488  int i;
3489 
3490  /*
3491  * We use the result of current_schemas(), not the search_path GUC,
3492  * because that might contain wildcards such as "$user", which won't
3493  * necessarily have the same value during restore. Also, this way avoids
3494  * listing schemas that may appear in search_path but not actually exist,
3495  * which seems like a prudent exclusion.
3496  */
3498  "SELECT pg_catalog.current_schemas(false)");
3499 
3500  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3501  pg_fatal("could not parse result of current_schemas()");
3502 
3503  /*
3504  * We use set_config(), not a simple "SET search_path" command, because
3505  * the latter has less-clean behavior if the search path is empty. While
3506  * that's likely to get fixed at some point, it seems like a good idea to
3507  * be as backwards-compatible as possible in what we put into archives.
3508  */
3509  for (i = 0; i < nschemanames; i++)
3510  {
3511  if (i > 0)
3512  appendPQExpBufferStr(path, ", ");
3513  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3514  }
3515 
3516  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3517  appendStringLiteralAH(qry, path->data, AH);
3518  appendPQExpBufferStr(qry, ", false);\n");
3519 
3520  pg_log_info("saving search_path = %s", path->data);
3521 
3523  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3524  .description = "SEARCHPATH",
3525  .section = SECTION_PRE_DATA,
3526  .createStmt = qry->data));
3527 
3528  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3529  AH->searchpath = pg_strdup(qry->data);
3530 
3531  free(schemanames);
3532  PQclear(res);
3533  destroyPQExpBuffer(qry);
3534  destroyPQExpBuffer(path);
3535 }
3536 
3537 
3538 /*
3539  * getLOs:
3540  * Collect schema-level data about large objects
3541  */
3542 static void
3544 {
3545  DumpOptions *dopt = fout->dopt;
3546  PQExpBuffer loQry = createPQExpBuffer();
3547  LoInfo *loinfo;
3548  DumpableObject *lodata;
3549  PGresult *res;
3550  int ntups;
3551  int i;
3552  int i_oid;
3553  int i_lomowner;
3554  int i_lomacl;
3555  int i_acldefault;
3556 
3557  pg_log_info("reading large objects");
3558 
3559  /* Fetch LO OIDs, and owner/ACL data */
3560  appendPQExpBufferStr(loQry,
3561  "SELECT oid, lomowner, lomacl, "
3562  "acldefault('L', lomowner) AS acldefault "
3563  "FROM pg_largeobject_metadata");
3564 
3565  res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3566 
3567  i_oid = PQfnumber(res, "oid");
3568  i_lomowner = PQfnumber(res, "lomowner");
3569  i_lomacl = PQfnumber(res, "lomacl");
3570  i_acldefault = PQfnumber(res, "acldefault");
3571 
3572  ntups = PQntuples(res);
3573 
3574  /*
3575  * Each large object has its own "BLOB" archive entry.
3576  */
3577  loinfo = (LoInfo *) pg_malloc(ntups * sizeof(LoInfo));
3578 
3579  for (i = 0; i < ntups; i++)
3580  {
3581  loinfo[i].dobj.objType = DO_LARGE_OBJECT;
3582  loinfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3583  loinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3584  AssignDumpId(&loinfo[i].dobj);
3585 
3586  loinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3587  loinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3588  loinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3589  loinfo[i].dacl.privtype = 0;
3590  loinfo[i].dacl.initprivs = NULL;
3591  loinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
3592 
3593  /* LOs have data */
3594  loinfo[i].dobj.components |= DUMP_COMPONENT_DATA;
3595 
3596  /* Mark whether LO has an ACL */
3597  if (!PQgetisnull(res, i, i_lomacl))
3598  loinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
3599 
3600  /*
3601  * In binary-upgrade mode for LOs, we do *not* dump out the LO data,
3602  * as it will be copied by pg_upgrade, which simply copies the
3603  * pg_largeobject table. We *do* however dump out anything but the
3604  * data, as pg_upgrade copies just pg_largeobject, but not
3605  * pg_largeobject_metadata, after the dump is restored.
3606  */
3607  if (dopt->binary_upgrade)
3608  loinfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3609  }
3610 
3611  /*
3612  * If we have any large objects, a "BLOBS" archive entry is needed. This
3613  * is just a placeholder for sorting; it carries no data now.
3614  */
3615  if (ntups > 0)
3616  {
3617  lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3618  lodata->objType = DO_LARGE_OBJECT_DATA;
3619  lodata->catId = nilCatalogId;
3620  AssignDumpId(lodata);
3621  lodata->name = pg_strdup("BLOBS");
3622  lodata->components |= DUMP_COMPONENT_DATA;
3623  }
3624 
3625  PQclear(res);
3626  destroyPQExpBuffer(loQry);
3627 }
3628 
3629 /*
3630  * dumpLO
3631  *
3632  * dump the definition (metadata) of the given large object
3633  */
3634 static void
3635 dumpLO(Archive *fout, const LoInfo *loinfo)
3636 {
3637  PQExpBuffer cquery = createPQExpBuffer();
3638  PQExpBuffer dquery = createPQExpBuffer();
3639 
3640  appendPQExpBuffer(cquery,
3641  "SELECT pg_catalog.lo_create('%s');\n",
3642  loinfo->dobj.name);
3643 
3644  appendPQExpBuffer(dquery,
3645  "SELECT pg_catalog.lo_unlink('%s');\n",
3646  loinfo->dobj.name);
3647 
3648  if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3649  ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3650  ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3651  .owner = loinfo->rolname,
3652  .description = "BLOB",
3653  .section = SECTION_PRE_DATA,
3654  .createStmt = cquery->data,
3655  .dropStmt = dquery->data));
3656 
3657  /* Dump comment if any */
3658  if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3659  dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
3660  NULL, loinfo->rolname,
3661  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3662 
3663  /* Dump security label if any */
3664  if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3665  dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
3666  NULL, loinfo->rolname,
3667  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3668 
3669  /* Dump ACL if any */
3670  if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3671  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3672  loinfo->dobj.name, NULL,
3673  NULL, loinfo->rolname, &loinfo->dacl);
3674 
3675  destroyPQExpBuffer(cquery);
3676  destroyPQExpBuffer(dquery);
3677 }
3678 
3679 /*
3680  * dumpLOs:
3681  * dump the data contents of all large objects
3682  */
3683 static int
3684 dumpLOs(Archive *fout, const void *arg)
3685 {
3686  const char *loQry;
3687  const char *loFetchQry;
3688  PGconn *conn = GetConnection(fout);
3689  PGresult *res;
3690  char buf[LOBBUFSIZE];
3691  int ntups;
3692  int i;
3693  int cnt;
3694 
3695  pg_log_info("saving large objects");
3696 
3697  /*
3698  * Currently, we re-fetch all LO OIDs using a cursor. Consider scanning
3699  * the already-in-memory dumpable objects instead...
3700  */
3701  loQry =
3702  "DECLARE looid CURSOR FOR "
3703  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3704 
3705  ExecuteSqlStatement(fout, loQry);
3706 
3707  /* Command to fetch from cursor */
3708  loFetchQry = "FETCH 1000 IN looid";
3709 
3710  do
3711  {
3712  /* Do a fetch */
3713  res = ExecuteSqlQuery(fout, loFetchQry, PGRES_TUPLES_OK);
3714 
3715  /* Process the tuples, if any */
3716  ntups = PQntuples(res);
3717  for (i = 0; i < ntups; i++)
3718  {
3719  Oid loOid;
3720  int loFd;
3721 
3722  loOid = atooid(PQgetvalue(res, i, 0));
3723  /* Open the LO */
3724  loFd = lo_open(conn, loOid, INV_READ);
3725  if (loFd == -1)
3726  pg_fatal("could not open large object %u: %s",
3727  loOid, PQerrorMessage(conn));
3728 
3729  StartLO(fout, loOid);
3730 
3731  /* Now read it in chunks, sending data to archive */
3732  do
3733  {
3734  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3735  if (cnt < 0)
3736  pg_fatal("error reading large object %u: %s",
3737  loOid, PQerrorMessage(conn));
3738 
3739  WriteData(fout, buf, cnt);
3740  } while (cnt > 0);
3741 
3742  lo_close(conn, loFd);
3743 
3744  EndLO(fout, loOid);
3745  }
3746 
3747  PQclear(res);
3748  } while (ntups > 0);
3749 
3750  return 1;
3751 }
3752 
3753 /*
3754  * getPolicies
3755  * get information about all RLS policies on dumpable tables.
3756  */
3757 void
3758 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3759 {
3760  PQExpBuffer query;
3761  PQExpBuffer tbloids;
3762  PGresult *res;
3763  PolicyInfo *polinfo;
3764  int i_oid;
3765  int i_tableoid;
3766  int i_polrelid;
3767  int i_polname;
3768  int i_polcmd;
3769  int i_polpermissive;
3770  int i_polroles;
3771  int i_polqual;
3772  int i_polwithcheck;
3773  int i,
3774  j,
3775  ntups;
3776 
3777  /* No policies before 9.5 */
3778  if (fout->remoteVersion < 90500)
3779  return;
3780 
3781  query = createPQExpBuffer();
3782  tbloids = createPQExpBuffer();
3783 
3784  /*
3785  * Identify tables of interest, and check which ones have RLS enabled.
3786  */
3787  appendPQExpBufferChar(tbloids, '{');
3788  for (i = 0; i < numTables; i++)
3789  {
3790  TableInfo *tbinfo = &tblinfo[i];
3791 
3792  /* Ignore row security on tables not to be dumped */
3793  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3794  continue;
3795 
3796  /* It can't have RLS or policies if it's not a table */
3797  if (tbinfo->relkind != RELKIND_RELATION &&
3798  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
3799  continue;
3800 
3801  /* Add it to the list of table OIDs to be probed below */
3802  if (tbloids->len > 1) /* do we have more than the '{'? */
3803  appendPQExpBufferChar(tbloids, ',');
3804  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
3805 
3806  /* Is RLS enabled? (That's separate from whether it has policies) */
3807  if (tbinfo->rowsec)
3808  {
3810 
3811  /*
3812  * We represent RLS being enabled on a table by creating a
3813  * PolicyInfo object with null polname.
3814  *
3815  * Note: use tableoid 0 so that this object won't be mistaken for
3816  * something that pg_depend entries apply to.
3817  */
3818  polinfo = pg_malloc(sizeof(PolicyInfo));
3819  polinfo->dobj.objType = DO_POLICY;
3820  polinfo->dobj.catId.tableoid = 0;
3821  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3822  AssignDumpId(&polinfo->dobj);
3823  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3824  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3825  polinfo->poltable = tbinfo;
3826  polinfo->polname = NULL;
3827  polinfo->polcmd = '\0';
3828  polinfo->polpermissive = 0;
3829  polinfo->polroles = NULL;
3830  polinfo->polqual = NULL;
3831  polinfo->polwithcheck = NULL;
3832  }
3833  }
3834  appendPQExpBufferChar(tbloids, '}');
3835 
3836  /*
3837  * Now, read all RLS policies belonging to the tables of interest, and
3838  * create PolicyInfo objects for them. (Note that we must filter the
3839  * results server-side not locally, because we dare not apply pg_get_expr
3840  * to tables we don't have lock on.)
3841  */
3842  pg_log_info("reading row-level security policies");
3843 
3844  printfPQExpBuffer(query,
3845  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3846  if (fout->remoteVersion >= 100000)
3847  appendPQExpBufferStr(query, "pol.polpermissive, ");
3848  else
3849  appendPQExpBufferStr(query, "'t' as polpermissive, ");
3850  appendPQExpBuffer(query,
3851  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3852  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3853  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3854  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3855  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
3856  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
3857  tbloids->data);
3858 
3859  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3860 
3861  ntups = PQntuples(res);
3862  if (ntups > 0)
3863  {
3864  i_oid = PQfnumber(res, "oid");
3865  i_tableoid = PQfnumber(res, "tableoid");
3866  i_polrelid = PQfnumber(res, "polrelid");
3867  i_polname = PQfnumber(res, "polname");
3868  i_polcmd = PQfnumber(res, "polcmd");
3869  i_polpermissive = PQfnumber(res, "polpermissive");
3870  i_polroles = PQfnumber(res, "polroles");
3871  i_polqual = PQfnumber(res, "polqual");
3872  i_polwithcheck = PQfnumber(res, "polwithcheck");
3873 
3874  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3875 
3876  for (j = 0; j < ntups; j++)
3877  {
3878  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3879  TableInfo *tbinfo = findTableByOid(polrelid);
3880 
3882 
3883  polinfo[j].dobj.objType = DO_POLICY;
3884  polinfo[j].dobj.catId.tableoid =
3885  atooid(PQgetvalue(res, j, i_tableoid));
3886  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3887  AssignDumpId(&polinfo[j].dobj);
3888  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3889  polinfo[j].poltable = tbinfo;
3890  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3891  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3892 
3893  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3894  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3895 
3896  if (PQgetisnull(res, j, i_polroles))
3897  polinfo[j].polroles = NULL;
3898  else
3899  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3900 
3901  if (PQgetisnull(res, j, i_polqual))
3902  polinfo[j].polqual = NULL;
3903  else
3904  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3905 
3906  if (PQgetisnull(res, j, i_polwithcheck))
3907  polinfo[j].polwithcheck = NULL;
3908  else
3909  polinfo[j].polwithcheck
3910  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3911  }
3912  }
3913 
3914  PQclear(res);
3915 
3916  destroyPQExpBuffer(query);
3917  destroyPQExpBuffer(tbloids);
3918 }
3919 
3920 /*
3921  * dumpPolicy
3922  * dump the definition of the given policy
3923  */
3924 static void
3925 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3926 {
3927  DumpOptions *dopt = fout->dopt;
3928  TableInfo *tbinfo = polinfo->poltable;
3929  PQExpBuffer query;
3930  PQExpBuffer delqry;
3931  PQExpBuffer polprefix;
3932  char *qtabname;
3933  const char *cmd;
3934  char *tag;
3935 
3936  /* Do nothing in data-only dump */
3937  if (dopt->dataOnly)
3938  return;
3939 
3940  /*
3941  * If polname is NULL, then this record is just indicating that ROW LEVEL
3942  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3943  * ROW LEVEL SECURITY.
3944  */
3945  if (polinfo->polname == NULL)
3946  {
3947  query = createPQExpBuffer();
3948 
3949  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3950  fmtQualifiedDumpable(tbinfo));
3951 
3952  /*
3953  * We must emit the ROW SECURITY object's dependency on its table
3954  * explicitly, because it will not match anything in pg_depend (unlike
3955  * the case for other PolicyInfo objects).
3956  */
3957  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3958  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3959  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3960  .namespace = polinfo->dobj.namespace->dobj.name,
3961  .owner = tbinfo->rolname,
3962  .description = "ROW SECURITY",
3963  .section = SECTION_POST_DATA,
3964  .createStmt = query->data,
3965  .deps = &(tbinfo->dobj.dumpId),
3966  .nDeps = 1));
3967 
3968  destroyPQExpBuffer(query);
3969  return;
3970  }
3971 
3972  if (polinfo->polcmd == '*')
3973  cmd = "";
3974  else if (polinfo->polcmd == 'r')
3975  cmd = " FOR SELECT";
3976  else if (polinfo->polcmd == 'a')
3977  cmd = " FOR INSERT";
3978  else if (polinfo->polcmd == 'w')
3979  cmd = " FOR UPDATE";
3980  else if (polinfo->polcmd == 'd')
3981  cmd = " FOR DELETE";
3982  else
3983  pg_fatal("unexpected policy command type: %c",
3984  polinfo->polcmd);
3985 
3986  query = createPQExpBuffer();
3987  delqry = createPQExpBuffer();
3988  polprefix = createPQExpBuffer();
3989 
3990  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3991 
3992  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
3993 
3994  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
3995  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
3996 
3997  if (polinfo->polroles != NULL)
3998  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
3999 
4000  if (polinfo->polqual != NULL)
4001  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
4002 
4003  if (polinfo->polwithcheck != NULL)
4004  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
4005 
4006  appendPQExpBufferStr(query, ";\n");
4007 
4008  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
4009  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
4010 
4011  appendPQExpBuffer(polprefix, "POLICY %s ON",
4012  fmtId(polinfo->polname));
4013 
4014  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
4015 
4016  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4017  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4018  ARCHIVE_OPTS(.tag = tag,
4019  .namespace = polinfo->dobj.namespace->dobj.name,
4020  .owner = tbinfo->rolname,
4021  .description = "POLICY",
4022  .section = SECTION_POST_DATA,
4023  .createStmt = query->data,
4024  .dropStmt = delqry->data));
4025 
4026  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4027  dumpComment(fout, polprefix->data, qtabname,
4028  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
4029  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
4030 
4031  free(tag);
4032  destroyPQExpBuffer(query);
4033  destroyPQExpBuffer(delqry);
4034  destroyPQExpBuffer(polprefix);
4035  free(qtabname);
4036 }
4037 
4038 /*
4039  * getPublications
4040  * get information about publications
4041  */
4043 getPublications(Archive *fout, int *numPublications)
4044 {
4045  DumpOptions *dopt = fout->dopt;
4046  PQExpBuffer query;
4047  PGresult *res;
4048  PublicationInfo *pubinfo;
4049  int i_tableoid;
4050  int i_oid;
4051  int i_pubname;
4052  int i_pubowner;
4053  int i_puballtables;
4054  int i_pubinsert;
4055  int i_pubupdate;
4056  int i_pubdelete;
4057  int i_pubtruncate;
4058  int i_pubviaroot;
4059  int i,
4060  ntups;
4061 
4062  if (dopt->no_publications || fout->remoteVersion < 100000)
4063  {
4064  *numPublications = 0;
4065  return NULL;
4066  }
4067 
4068  query = createPQExpBuffer();
4069 
4070  resetPQExpBuffer(query);
4071 
4072  /* Get the publications. */
4073  if (fout->remoteVersion >= 130000)
4074  appendPQExpBufferStr(query,
4075  "SELECT p.tableoid, p.oid, p.pubname, "
4076  "p.pubowner, "
4077  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
4078  "FROM pg_publication p");
4079  else if (fout->remoteVersion >= 110000)
4080  appendPQExpBufferStr(query,
4081  "SELECT p.tableoid, p.oid, p.pubname, "
4082  "p.pubowner, "
4083  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
4084  "FROM pg_publication p");
4085  else
4086  appendPQExpBufferStr(query,
4087  "SELECT p.tableoid, p.oid, p.pubname, "
4088  "p.pubowner, "
4089  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
4090  "FROM pg_publication p");
4091 
4092  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4093 
4094  ntups = PQntuples(res);
4095 
4096  i_tableoid = PQfnumber(res, "tableoid");
4097  i_oid = PQfnumber(res, "oid");
4098  i_pubname = PQfnumber(res, "pubname");
4099  i_pubowner = PQfnumber(res, "pubowner");
4100  i_puballtables = PQfnumber(res, "puballtables");
4101  i_pubinsert = PQfnumber(res, "pubinsert");
4102  i_pubupdate = PQfnumber(res, "pubupdate");
4103  i_pubdelete = PQfnumber(res, "pubdelete");
4104  i_pubtruncate = PQfnumber(res, "pubtruncate");
4105  i_pubviaroot = PQfnumber(res, "pubviaroot");
4106 
4107  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4108 
4109  for (i = 0; i < ntups; i++)
4110  {
4111  pubinfo[i].dobj.objType = DO_PUBLICATION;
4112  pubinfo[i].dobj.catId.tableoid =
4113  atooid(PQgetvalue(res, i, i_tableoid));
4114  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4115  AssignDumpId(&pubinfo[i].dobj);
4116  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4117  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4118  pubinfo[i].puballtables =
4119  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4120  pubinfo[i].pubinsert =
4121  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4122  pubinfo[i].pubupdate =
4123  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4124  pubinfo[i].pubdelete =
4125  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4126  pubinfo[i].pubtruncate =
4127  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4128  pubinfo[i].pubviaroot =
4129  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4130 
4131  /* Decide whether we want to dump it */
4132  selectDumpableObject(&(pubinfo[i].dobj), fout);
4133  }
4134  PQclear(res);
4135 
4136  destroyPQExpBuffer(query);
4137 
4138  *numPublications = ntups;
4139  return pubinfo;
4140 }
4141 
4142 /*
4143  * dumpPublication
4144  * dump the definition of the given publication
4145  */
4146 static void
4148 {
4149  DumpOptions *dopt = fout->dopt;
4150  PQExpBuffer delq;
4151  PQExpBuffer query;
4152  char *qpubname;
4153  bool first = true;
4154 
4155  /* Do nothing in data-only dump */
4156  if (dopt->dataOnly)
4157  return;
4158 
4159  delq = createPQExpBuffer();
4160  query = createPQExpBuffer();
4161 
4162  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4163 
4164  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4165  qpubname);
4166 
4167  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4168  qpubname);
4169 
4170  if (pubinfo->puballtables)
4171  appendPQExpBufferStr(query, " FOR ALL TABLES");
4172 
4173  appendPQExpBufferStr(query, " WITH (publish = '");
4174  if (pubinfo->pubinsert)
4175  {
4176  appendPQExpBufferStr(query, "insert");
4177  first = false;
4178  }
4179 
4180  if (pubinfo->pubupdate)
4181  {
4182  if (!first)
4183  appendPQExpBufferStr(query, ", ");
4184 
4185  appendPQExpBufferStr(query, "update");
4186  first = false;
4187  }
4188 
4189  if (pubinfo->pubdelete)
4190  {
4191  if (!first)
4192  appendPQExpBufferStr(query, ", ");
4193 
4194  appendPQExpBufferStr(query, "delete");
4195  first = false;
4196  }
4197 
4198  if (pubinfo->pubtruncate)
4199  {
4200  if (!first)
4201  appendPQExpBufferStr(query, ", ");
4202 
4203  appendPQExpBufferStr(query, "truncate");
4204  first = false;
4205  }
4206 
4207  appendPQExpBufferChar(query, '\'');
4208 
4209  if (pubinfo->pubviaroot)
4210  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4211 
4212  appendPQExpBufferStr(query, ");\n");
4213 
4214  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4215  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4216  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4217  .owner = pubinfo->rolname,
4218  .description = "PUBLICATION",
4219  .section = SECTION_POST_DATA,
4220  .createStmt = query->data,
4221  .dropStmt = delq->data));
4222 
4223  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4224  dumpComment(fout, "PUBLICATION", qpubname,
4225  NULL, pubinfo->rolname,
4226  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4227 
4228  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4229  dumpSecLabel(fout, "PUBLICATION", qpubname,
4230  NULL, pubinfo->rolname,
4231  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4232 
4233  destroyPQExpBuffer(delq);
4234  destroyPQExpBuffer(query);
4235  free(qpubname);
4236 }
4237 
4238 /*
4239  * getPublicationNamespaces
4240  * get information about publication membership for dumpable schemas.
4241  */
4242 void
4244 {
4245  PQExpBuffer query;
4246  PGresult *res;
4247  PublicationSchemaInfo *pubsinfo;
4248  DumpOptions *dopt = fout->dopt;
4249  int i_tableoid;
4250  int i_oid;
4251  int i_pnpubid;
4252  int i_pnnspid;
4253  int i,
4254  j,
4255  ntups;
4256 
4257  if (dopt->no_publications || fout->remoteVersion < 150000)
4258  return;
4259 
4260  query = createPQExpBuffer();
4261 
4262  /* Collect all publication membership info. */
4263  appendPQExpBufferStr(query,
4264  "SELECT tableoid, oid, pnpubid, pnnspid "
4265  "FROM pg_catalog.pg_publication_namespace");
4266  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4267 
4268  ntups = PQntuples(res);
4269 
4270  i_tableoid = PQfnumber(res, "tableoid");
4271  i_oid = PQfnumber(res, "oid");
4272  i_pnpubid = PQfnumber(res, "pnpubid");
4273  i_pnnspid = PQfnumber(res, "pnnspid");
4274 
4275  /* this allocation may be more than we need */
4276  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4277  j = 0;
4278 
4279  for (i = 0; i < ntups; i++)
4280  {
4281  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4282  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4283  PublicationInfo *pubinfo;
4284  NamespaceInfo *nspinfo;
4285 
4286  /*
4287  * Ignore any entries for which we aren't interested in either the
4288  * publication or the rel.
4289  */
4290  pubinfo = findPublicationByOid(pnpubid);
4291  if (pubinfo == NULL)
4292  continue;
4293  nspinfo = findNamespaceByOid(pnnspid);
4294  if (nspinfo == NULL)
4295  continue;
4296 
4297  /*
4298  * We always dump publication namespaces unless the corresponding
4299  * namespace is excluded from the dump.
4300  */
4301  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4302  continue;
4303 
4304  /* OK, make a DumpableObject for this relationship */
4306  pubsinfo[j].dobj.catId.tableoid =
4307  atooid(PQgetvalue(res, i, i_tableoid));
4308  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4309  AssignDumpId(&pubsinfo[j].dobj);
4310  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4311  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4312  pubsinfo[j].publication = pubinfo;
4313  pubsinfo[j].pubschema = nspinfo;
4314 
4315  /* Decide whether we want to dump it */
4316  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4317 
4318  j++;
4319  }
4320 
4321  PQclear(res);
4322  destroyPQExpBuffer(query);
4323 }
4324 
4325 /*
4326  * getPublicationTables
4327  * get information about publication membership for dumpable tables.
4328  */
4329 void
4330 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4331 {
4332  PQExpBuffer query;
4333  PGresult *res;
4334  PublicationRelInfo *pubrinfo;
4335  DumpOptions *dopt = fout->dopt;
4336  int i_tableoid;
4337  int i_oid;
4338  int i_prpubid;
4339  int i_prrelid;
4340  int i_prrelqual;
4341  int i_prattrs;
4342  int i,
4343  j,
4344  ntups;
4345 
4346  if (dopt->no_publications || fout->remoteVersion < 100000)
4347  return;
4348 
4349  query = createPQExpBuffer();
4350 
4351  /* Collect all publication membership info. */
4352  if (fout->remoteVersion >= 150000)
4353  appendPQExpBufferStr(query,
4354  "SELECT tableoid, oid, prpubid, prrelid, "
4355  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4356  "(CASE\n"
4357  " WHEN pr.prattrs IS NOT NULL THEN\n"
4358  " (SELECT array_agg(attname)\n"
4359  " FROM\n"
4360  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4361  " pg_catalog.pg_attribute\n"
4362  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4363  " ELSE NULL END) prattrs "
4364  "FROM pg_catalog.pg_publication_rel pr");
4365  else
4366  appendPQExpBufferStr(query,
4367  "SELECT tableoid, oid, prpubid, prrelid, "
4368  "NULL AS prrelqual, NULL AS prattrs "
4369  "FROM pg_catalog.pg_publication_rel");
4370  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4371 
4372  ntups = PQntuples(res);
4373 
4374  i_tableoid = PQfnumber(res, "tableoid");
4375  i_oid = PQfnumber(res, "oid");
4376  i_prpubid = PQfnumber(res, "prpubid");
4377  i_prrelid = PQfnumber(res, "prrelid");
4378  i_prrelqual = PQfnumber(res, "prrelqual");
4379  i_prattrs = PQfnumber(res, "prattrs");
4380 
4381  /* this allocation may be more than we need */
4382  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4383  j = 0;
4384 
4385  for (i = 0; i < ntups; i++)
4386  {
4387  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4388  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4389  PublicationInfo *pubinfo;
4390  TableInfo *tbinfo;
4391 
4392  /*
4393  * Ignore any entries for which we aren't interested in either the
4394  * publication or the rel.
4395  */
4396  pubinfo = findPublicationByOid(prpubid);
4397  if (pubinfo == NULL)
4398  continue;
4399  tbinfo = findTableByOid(prrelid);
4400  if (tbinfo == NULL)
4401  continue;
4402 
4403  /*
4404  * Ignore publication membership of tables whose definitions are not
4405  * to be dumped.
4406  */
4407  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4408  continue;
4409 
4410  /* OK, make a DumpableObject for this relationship */
4411  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4412  pubrinfo[j].dobj.catId.tableoid =
4413  atooid(PQgetvalue(res, i, i_tableoid));
4414  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4415  AssignDumpId(&pubrinfo[j].dobj);
4416  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4417  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4418  pubrinfo[j].publication = pubinfo;
4419  pubrinfo[j].pubtable = tbinfo;
4420  if (PQgetisnull(res, i, i_prrelqual))
4421  pubrinfo[j].pubrelqual = NULL;
4422  else
4423  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4424 
4425  if (!PQgetisnull(res, i, i_prattrs))
4426  {
4427  char **attnames;
4428  int nattnames;
4429  PQExpBuffer attribs;
4430 
4431  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4432  &attnames, &nattnames))
4433  pg_fatal("could not parse %s array", "prattrs");
4434  attribs = createPQExpBuffer();
4435  for (int k = 0; k < nattnames; k++)
4436  {
4437  if (k > 0)
4438  appendPQExpBufferStr(attribs, ", ");
4439 
4440  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4441  }
4442  pubrinfo[j].pubrattrs = attribs->data;
4443  }
4444  else
4445  pubrinfo[j].pubrattrs = NULL;
4446 
4447  /* Decide whether we want to dump it */
4448  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4449 
4450  j++;
4451  }
4452 
4453  PQclear(res);
4454  destroyPQExpBuffer(query);
4455 }
4456 
4457 /*
4458  * dumpPublicationNamespace
4459  * dump the definition of the given publication schema mapping.
4460  */
4461 static void
4463 {
4464  DumpOptions *dopt = fout->dopt;
4465  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4466  PublicationInfo *pubinfo = pubsinfo->publication;
4467  PQExpBuffer query;
4468  char *tag;
4469 
4470  /* Do nothing in data-only dump */
4471  if (dopt->dataOnly)
4472  return;
4473 
4474  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4475 
4476  query = createPQExpBuffer();
4477 
4478  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4479  appendPQExpBuffer(query, "ADD TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4480 
4481  /*
4482  * There is no point in creating drop query as the drop is done by schema
4483  * drop.
4484  */
4485  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4486  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4487  ARCHIVE_OPTS(.tag = tag,
4488  .namespace = schemainfo->dobj.name,
4489  .owner = pubinfo->rolname,
4490  .description = "PUBLICATION TABLES IN SCHEMA",
4491  .section = SECTION_POST_DATA,
4492  .createStmt = query->data));
4493 
4494  /* These objects can't currently have comments or seclabels */
4495 
4496  free(tag);
4497  destroyPQExpBuffer(query);
4498 }
4499 
4500 /*
4501  * dumpPublicationTable
4502  * dump the definition of the given publication table mapping
4503  */
4504 static void
4506 {
4507  DumpOptions *dopt = fout->dopt;
4508  PublicationInfo *pubinfo = pubrinfo->publication;
4509  TableInfo *tbinfo = pubrinfo->pubtable;
4510  PQExpBuffer query;
4511  char *tag;
4512 
4513  /* Do nothing in data-only dump */
4514  if (dopt->dataOnly)
4515  return;
4516 
4517  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4518 
4519  query = createPQExpBuffer();
4520 
4521  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4522  fmtId(pubinfo->dobj.name));
4523  appendPQExpBuffer(query, " %s",
4524  fmtQualifiedDumpable(tbinfo));
4525 
4526  if (pubrinfo->pubrattrs)
4527  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4528 
4529  if (pubrinfo->pubrelqual)
4530  {
4531  /*
4532  * It's necessary to add parentheses around the expression because
4533  * pg_get_expr won't supply the parentheses for things like WHERE
4534  * TRUE.
4535  */
4536  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4537  }
4538  appendPQExpBufferStr(query, ";\n");
4539 
4540  /*
4541  * There is no point in creating a drop query as the drop is done by table
4542  * drop. (If you think to change this, see also _printTocEntry().)
4543  * Although this object doesn't really have ownership as such, set the
4544  * owner field anyway to ensure that the command is run by the correct
4545  * role at restore time.
4546  */
4547  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4548  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4549  ARCHIVE_OPTS(.tag = tag,
4550  .namespace = tbinfo->dobj.namespace->dobj.name,
4551  .owner = pubinfo->rolname,
4552  .description = "PUBLICATION TABLE",
4553  .section = SECTION_POST_DATA,
4554  .createStmt = query->data));
4555 
4556  /* These objects can't currently have comments or seclabels */
4557 
4558  free(tag);
4559  destroyPQExpBuffer(query);
4560 }
4561 
4562 /*
4563  * Is the currently connected user a superuser?
4564  */
4565 static bool
4567 {
4568  ArchiveHandle *AH = (ArchiveHandle *) fout;
4569  const char *val;
4570 
4571  val = PQparameterStatus(AH->connection, "is_superuser");
4572 
4573  if (val && strcmp(val, "on") == 0)
4574  return true;
4575 
4576  return false;
4577 }
4578 
4579 /*
4580  * getSubscriptions
4581  * get information about subscriptions
4582  */
4583 void
4585 {
4586  DumpOptions *dopt = fout->dopt;
4587  PQExpBuffer query;
4588  PGresult *res;
4589  SubscriptionInfo *subinfo;
4590  int i_tableoid;
4591  int i_oid;
4592  int i_subname;
4593  int i_subowner;
4594  int i_substream;
4595  int i_subtwophasestate;
4596  int i_subdisableonerr;
4597  int i_suborigin;
4598  int i_subconninfo;
4599  int i_subslotname;
4600  int i_subsynccommit;
4601  int i_subpublications;
4602  int i_subbinary;
4603  int i_subpasswordrequired;
4604  int i,
4605  ntups;
4606 
4607  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4608  return;
4609 
4610  if (!is_superuser(fout))
4611  {
4612  int n;
4613 
4614  res = ExecuteSqlQuery(fout,
4615  "SELECT count(*) FROM pg_subscription "
4616  "WHERE subdbid = (SELECT oid FROM pg_database"
4617  " WHERE datname = current_database())",
4618  PGRES_TUPLES_OK);
4619  n = atoi(PQgetvalue(res, 0, 0));
4620  if (n > 0)
4621  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4622  PQclear(res);
4623  return;
4624  }
4625 
4626  query = createPQExpBuffer();
4627 
4628  /* Get the subscriptions in current database. */
4629  appendPQExpBufferStr(query,
4630  "SELECT s.tableoid, s.oid, s.subname,\n"
4631  " s.subowner,\n"
4632  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4633  " s.subpublications,\n");
4634 
4635  if (fout->remoteVersion >= 140000)
4636  appendPQExpBufferStr(query, " s.subbinary,\n");
4637  else
4638  appendPQExpBufferStr(query, " false AS subbinary,\n");
4639 
4640  if (fout->remoteVersion >= 140000)
4641  appendPQExpBufferStr(query, " s.substream,\n");
4642  else
4643  appendPQExpBufferStr(query, " 'f' AS substream,\n");
4644 
4645  if (fout->remoteVersion >= 150000)
4646  appendPQExpBufferStr(query,
4647  " s.subtwophasestate,\n"
4648  " s.subdisableonerr,\n");
4649  else
4650  appendPQExpBuffer(query,
4651  " '%c' AS subtwophasestate,\n"
4652  " false AS subdisableonerr,\n",
4654 
4655  if (fout->remoteVersion >= 160000)
4656  appendPQExpBufferStr(query,
4657  " s.suborigin,\n"
4658  " s.subpasswordrequired\n");
4659  else
4660  appendPQExpBuffer(query,
4661  " '%s' AS suborigin,\n"
4662  " 't' AS subpasswordrequired\n",
4664 
4665  appendPQExpBufferStr(query,
4666  "FROM pg_subscription s\n"
4667  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4668  " WHERE datname = current_database())");
4669 
4670  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4671 
4672  ntups = PQntuples(res);
4673 
4674  /*
4675  * Get subscription fields. We don't include subskiplsn in the dump as
4676  * after restoring the dump this value may no longer be relevant.
4677  */
4678  i_tableoid = PQfnumber(res, "tableoid");
4679  i_oid = PQfnumber(res, "oid");
4680  i_subname = PQfnumber(res, "subname");
4681  i_subowner = PQfnumber(res, "subowner");
4682  i_subconninfo = PQfnumber(res, "subconninfo");
4683  i_subslotname = PQfnumber(res, "subslotname");
4684  i_subsynccommit = PQfnumber(res, "subsynccommit");
4685  i_subpublications = PQfnumber(res, "subpublications");
4686  i_subbinary = PQfnumber(res, "subbinary");
4687  i_substream = PQfnumber(res, "substream");
4688  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4689  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4690  i_suborigin = PQfnumber(res, "suborigin");
4691  i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
4692 
4693  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4694 
4695  for (i = 0; i < ntups; i++)
4696  {
4697  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4698  subinfo[i].dobj.catId.tableoid =
4699  atooid(PQgetvalue(res, i, i_tableoid));
4700  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4701  AssignDumpId(&subinfo[i].dobj);
4702  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4703  subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
4704  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4705  if (PQgetisnull(res, i, i_subslotname))
4706  subinfo[i].subslotname = NULL;
4707  else
4708  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4709  subinfo[i].subsynccommit =
4710  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4711  subinfo[i].subpublications =
4712  pg_strdup(PQgetvalue(res, i, i_subpublications));
4713  subinfo[i].subbinary =
4714  pg_strdup(PQgetvalue(res, i, i_subbinary));
4715  subinfo[i].substream =
4716  pg_strdup(PQgetvalue(res, i, i_substream));
4717  subinfo[i].subtwophasestate =
4718  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4719  subinfo[i].subdisableonerr =
4720  pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
4721  subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
4722  subinfo[i].subpasswordrequired =
4723  pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
4724 
4725  /* Decide whether we want to dump it */
4726  selectDumpableObject(&(subinfo[i].dobj), fout);
4727  }
4728  PQclear(res);
4729 
4730  destroyPQExpBuffer(query);
4731 }
4732 
4733 /*
4734  * dumpSubscription
4735  * dump the definition of the given subscription
4736  */
4737 static void
4739 {
4740  DumpOptions *dopt = fout->dopt;
4741  PQExpBuffer delq;
4742  PQExpBuffer query;
4743  PQExpBuffer publications;
4744  char *qsubname;
4745  char **pubnames = NULL;
4746  int npubnames = 0;
4747  int i;
4748  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4749 
4750  /* Do nothing in data-only dump */
4751  if (dopt->dataOnly)
4752  return;
4753 
4754  delq = createPQExpBuffer();
4755  query = createPQExpBuffer();
4756 
4757  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4758 
4759  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4760  qsubname);
4761 
4762  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4763  qsubname);
4764  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4765 
4766  /* Build list of quoted publications and append them to query. */
4767  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4768  pg_fatal("could not parse %s array", "subpublications");
4769 
4770  publications = createPQExpBuffer();
4771  for (i = 0; i < npubnames; i++)
4772  {
4773  if (i > 0)
4774  appendPQExpBufferStr(publications, ", ");
4775 
4776  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4777  }
4778 
4779  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4780  if (subinfo->subslotname)
4781  appendStringLiteralAH(query, subinfo->subslotname, fout);
4782  else
4783  appendPQExpBufferStr(query, "NONE");
4784 
4785  if (strcmp(subinfo->subbinary, "t") == 0)
4786  appendPQExpBufferStr(query, ", binary = true");
4787 
4788  if (strcmp(subinfo->substream, "t") == 0)
4789  appendPQExpBufferStr(query, ", streaming = on");
4790  else if (strcmp(subinfo->substream, "p") == 0)
4791  appendPQExpBufferStr(query, ", streaming = parallel");
4792 
4793  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4794  appendPQExpBufferStr(query, ", two_phase = on");
4795 
4796  if (strcmp(subinfo->subdisableonerr, "t") == 0)
4797  appendPQExpBufferStr(query, ", disable_on_error = true");
4798 
4799  if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
4800  appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
4801 
4802  if (strcmp(subinfo->subsynccommit, "off") != 0)
4803  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4804 
4805  if (strcmp(subinfo->subpasswordrequired, "t") != 0)
4806  appendPQExpBuffer(query, ", password_required = false");
4807 
4808  appendPQExpBufferStr(query, ");\n");
4809 
4810  if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4811  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4812  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4813  .owner = subinfo->rolname,
4814  .description = "SUBSCRIPTION",
4815  .section = SECTION_POST_DATA,
4816  .createStmt = query->data,
4817  .dropStmt = delq->data));
4818 
4819  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4820  dumpComment(fout, "SUBSCRIPTION", qsubname,
4821  NULL, subinfo->rolname,
4822  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4823 
4824  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4825  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4826  NULL, subinfo->rolname,
4827  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4828 
4829  destroyPQExpBuffer(publications);
4830  free(pubnames);
4831 
4832  destroyPQExpBuffer(delq);
4833  destroyPQExpBuffer(query);
4834  free(qsubname);
4835 }
4836 
4837 /*
4838  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4839  * the object needs.
4840  */
4841 static void
4843  PQExpBuffer create,
4844  const DumpableObject *dobj,
4845  const char *catalog,
4846  const char *keyword,
4847  const char *objname)
4848 {
4849  if (dobj->depends_on_ext)
4850  {
4851  char *nm;
4852  PGresult *res;
4853  PQExpBuffer query;
4854  int ntups;
4855  int i_extname;
4856  int i;
4857 
4858  /* dodge fmtId() non-reentrancy */
4859  nm = pg_strdup(objname);
4860 
4861  query = createPQExpBuffer();
4862  appendPQExpBuffer(query,
4863  "SELECT e.extname "
4864  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4865  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4866  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4867  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4868  catalog,
4869  dobj->catId.oid);
4870  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4871  ntups = PQntuples(res);
4872  i_extname = PQfnumber(res, "extname");
4873  for (i = 0; i < ntups; i++)
4874  {
4875  appendPQExpBuffer(create, "\nALTER %s %s DEPENDS ON EXTENSION %s;",
4876  keyword, nm,
4877  fmtId(PQgetvalue(res, i, i_extname)));
4878  }
4879 
4880  PQclear(res);
4881  destroyPQExpBuffer(query);
4882  pg_free(nm);
4883  }
4884 }
4885 
4886 static Oid
4888 {
4889  /*
4890  * If the old version didn't assign an array type, but the new version
4891  * does, we must select an unused type OID to assign. This currently only
4892  * happens for domains, when upgrading pre-v11 to v11 and up.
4893  *
4894  * Note: local state here is kind of ugly, but we must have some, since we
4895  * mustn't choose the same unused OID more than once.
4896  */
4897  static Oid next_possible_free_oid = FirstNormalObjectId;
4898  PGresult *res;
4899  bool is_dup;
4900 
4901  do
4902  {
4903  ++next_possible_free_oid;
4904  printfPQExpBuffer(upgrade_query,
4905  "SELECT EXISTS(SELECT 1 "
4906  "FROM pg_catalog.pg_type "
4907  "WHERE oid = '%u'::pg_catalog.oid);",
4908  next_possible_free_oid);
4909  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4910  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4911  PQclear(res);
4912  } while (is_dup);
4913 
4914  return next_possible_free_oid;
4915 }
4916 
4917 static void
4919  PQExpBuffer upgrade_buffer,
4920  Oid pg_type_oid,
4921  bool force_array_type,
4922  bool include_multirange_type)
4923 {
4924  PQExpBuffer upgrade_query = createPQExpBuffer();
4925  PGresult *res;
4926  Oid pg_type_array_oid;
4927  Oid pg_type_multirange_oid;
4928  Oid pg_type_multirange_array_oid;
4929 
4930  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4931  appendPQExpBuffer(upgrade_buffer,
4932  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4933  pg_type_oid);
4934 
4935  appendPQExpBuffer(upgrade_query,
4936  "SELECT typarray "
4937  "FROM pg_catalog.pg_type "
4938  "WHERE oid = '%u'::pg_catalog.oid;",
4939  pg_type_oid);
4940 
4941  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4942 
4943  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4944 
4945  PQclear(res);
4946 
4947  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4948  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4949 
4950  if (OidIsValid(pg_type_array_oid))
4951  {
4952  appendPQExpBufferStr(upgrade_buffer,
4953  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4954  appendPQExpBuffer(upgrade_buffer,
4955  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4956  pg_type_array_oid);
4957  }
4958 
4959  /*
4960  * Pre-set the multirange type oid and its own array type oid.
4961  */
4962  if (include_multirange_type)
4963  {
4964  if (fout->remoteVersion >= 140000)
4965  {
4966  printfPQExpBuffer(upgrade_query,
4967  "SELECT t.oid, t.typarray "
4968  "FROM pg_catalog.pg_type t "
4969  "JOIN pg_catalog.pg_range r "
4970  "ON t.oid = r.rngmultitypid "
4971  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4972  pg_type_oid);
4973 
4974  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4975 
4976  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4977  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")))