PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "common/relpath.h"
59 #include "dumputils.h"
60 #include "fe_utils/option_utils.h"
61 #include "fe_utils/string_utils.h"
62 #include "getopt_long.h"
63 #include "libpq/libpq-fs.h"
64 #include "parallel.h"
65 #include "pg_backup_db.h"
66 #include "pg_backup_utils.h"
67 #include "pg_dump.h"
68 #include "storage/block.h"
69 
70 typedef struct
71 {
72  Oid roleoid; /* role's OID */
73  const char *rolename; /* role's name */
74 } RoleNameItem;
75 
76 typedef struct
77 {
78  const char *descr; /* comment for an object */
79  Oid classoid; /* object class (catalog OID) */
80  Oid objoid; /* object OID */
81  int objsubid; /* subobject (table column #) */
82 } CommentItem;
83 
84 typedef struct
85 {
86  const char *provider; /* label provider of this security label */
87  const char *label; /* security label for an object */
88  Oid classoid; /* object class (catalog OID) */
89  Oid objoid; /* object OID */
90  int objsubid; /* subobject (table column #) */
91 } SecLabelItem;
92 
93 typedef enum OidOptions
94 {
97  zeroAsNone = 4
99 
100 /* global decls */
101 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
102 
103 static Oid g_last_builtin_oid; /* value of the last builtin oid */
104 
105 /* The specified names/patterns should to match at least one entity */
106 static int strict_names = 0;
107 
109 
110 /*
111  * Object inclusion/exclusion lists
112  *
113  * The string lists record the patterns given by command-line switches,
114  * which we then convert to lists of OIDs of matching objects.
115  */
117 static SimpleOidList schema_include_oids = {NULL, NULL};
119 static SimpleOidList schema_exclude_oids = {NULL, NULL};
120 
123 static SimpleOidList table_include_oids = {NULL, NULL};
126 static SimpleOidList table_exclude_oids = {NULL, NULL};
129 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
130 
133 
135 static SimpleOidList extension_include_oids = {NULL, NULL};
136 
137 static const CatalogId nilCatalogId = {0, 0};
138 
139 /* override for standard extra_float_digits setting */
140 static bool have_extra_float_digits = false;
142 
143 /* sorted table of role names */
144 static RoleNameItem *rolenames = NULL;
145 static int nrolenames = 0;
146 
147 /* sorted table of comments */
148 static CommentItem *comments = NULL;
149 static int ncomments = 0;
150 
151 /* sorted table of security labels */
152 static SecLabelItem *seclabels = NULL;
153 static int nseclabels = 0;
154 
155 /*
156  * The default number of rows per INSERT when
157  * --inserts is specified without --rows-per-insert
158  */
159 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
160 
161 /*
162  * Macro for producing quoted, schema-qualified name of a dumpable object.
163  */
164 #define fmtQualifiedDumpable(obj) \
165  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
166  (obj)->dobj.name)
167 
168 static void help(const char *progname);
169 static void setup_connection(Archive *AH,
170  const char *dumpencoding, const char *dumpsnapshot,
171  char *use_role);
173 static void expand_schema_name_patterns(Archive *fout,
174  SimpleStringList *patterns,
175  SimpleOidList *oids,
176  bool strict_names);
177 static void expand_extension_name_patterns(Archive *fout,
178  SimpleStringList *patterns,
179  SimpleOidList *oids,
180  bool strict_names);
182  SimpleStringList *patterns,
183  SimpleOidList *oids);
184 static void expand_table_name_patterns(Archive *fout,
185  SimpleStringList *patterns,
186  SimpleOidList *oids,
187  bool strict_names,
188  bool with_child_tables);
189 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
190  const char *pattern);
191 
192 static NamespaceInfo *findNamespace(Oid nsoid);
193 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
194 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
195 static const char *getRoleName(const char *roleoid_str);
196 static void collectRoleNames(Archive *fout);
197 static void getAdditionalACLs(Archive *fout);
198 static void dumpCommentExtended(Archive *fout, const char *type,
199  const char *name, const char *namespace,
200  const char *owner, CatalogId catalogId,
201  int subid, DumpId dumpId,
202  const char *initdb_comment);
203 static inline void dumpComment(Archive *fout, const char *type,
204  const char *name, const char *namespace,
205  const char *owner, CatalogId catalogId,
206  int subid, DumpId dumpId);
207 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
208 static void collectComments(Archive *fout);
209 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
210  const char *namespace, const char *owner,
211  CatalogId catalogId, int subid, DumpId dumpId);
212 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
213 static void collectSecLabels(Archive *fout);
214 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
215 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
216 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
217 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
218 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
219 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
220 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
221 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
222 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
223 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
224 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
225  PGresult *res);
226 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
227 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
228 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
229 static void dumpCast(Archive *fout, const CastInfo *cast);
230 static void dumpTransform(Archive *fout, const TransformInfo *transform);
231 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
232 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
233 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
234 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
235 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
236 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
237 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
238 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
239 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
240 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
241 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
242 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
243 static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
244 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
245 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
246 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
247 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
248 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
249 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
250 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
251 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
252 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
253 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
254 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
255 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
256 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
257 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
258 static void dumpUserMappings(Archive *fout,
259  const char *servername, const char *namespace,
260  const char *owner, CatalogId catalogId, DumpId dumpId);
261 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
262 
263 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
264  const char *type, const char *name, const char *subname,
265  const char *nspname, const char *owner,
266  const DumpableAcl *dacl);
267 
268 static void getDependencies(Archive *fout);
269 static void BuildArchiveDependencies(Archive *fout);
270 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
271  DumpId **dependencies, int *nDeps, int *allocDeps);
272 
274 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
275  DumpableObject *boundaryObjs);
276 
277 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
278 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
279 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
280 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
281 static void buildMatViewRefreshDependencies(Archive *fout);
282 static void getTableDataFKConstraints(void);
283 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
284  bool is_agg);
285 static char *format_function_signature(Archive *fout,
286  const FuncInfo *finfo, bool honor_quotes);
287 static char *convertRegProcReference(const char *proc);
288 static char *getFormattedOperatorName(const char *oproid);
289 static char *convertTSFunction(Archive *fout, Oid funcOid);
290 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
291 static void getLOs(Archive *fout);
292 static void dumpLO(Archive *fout, const LoInfo *binfo);
293 static int dumpLOs(Archive *fout, const void *arg);
294 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
295 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
296 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
297 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
298 static void dumpDatabase(Archive *fout);
299 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
300  const char *dbname, Oid dboid);
301 static void dumpEncoding(Archive *AH);
302 static void dumpStdStrings(Archive *AH);
303 static void dumpSearchPath(Archive *AH);
305  PQExpBuffer upgrade_buffer,
306  Oid pg_type_oid,
307  bool force_array_type,
308  bool include_multirange_type);
310  PQExpBuffer upgrade_buffer,
311  const TableInfo *tbinfo);
312 static void binary_upgrade_set_pg_class_oids(Archive *fout,
313  PQExpBuffer upgrade_buffer,
314  Oid pg_class_oid, bool is_index);
315 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
316  const DumpableObject *dobj,
317  const char *objtype,
318  const char *objname,
319  const char *objnamespace);
320 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
321 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
322 static bool nonemptyReloptions(const char *reloptions);
323 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
324  const char *prefix, Archive *fout);
325 static char *get_synchronized_snapshot(Archive *fout);
326 static void setupDumpWorker(Archive *AH);
327 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
328 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
329 
330 
331 int
332 main(int argc, char **argv)
333 {
334  int c;
335  const char *filename = NULL;
336  const char *format = "p";
337  TableInfo *tblinfo;
338  int numTables;
339  DumpableObject **dobjs;
340  int numObjs;
341  DumpableObject *boundaryObjs;
342  int i;
343  int optindex;
344  RestoreOptions *ropt;
345  Archive *fout; /* the script file */
346  bool g_verbose = false;
347  const char *dumpencoding = NULL;
348  const char *dumpsnapshot = NULL;
349  char *use_role = NULL;
350  int numWorkers = 1;
351  int plainText = 0;
352  ArchiveFormat archiveFormat = archUnknown;
353  ArchiveMode archiveMode;
354  pg_compress_specification compression_spec = {0};
355  char *compression_detail = NULL;
356  char *compression_algorithm_str = "none";
357  char *error_detail = NULL;
358  bool user_compression_defined = false;
359 
360  static DumpOptions dopt;
361 
362  static struct option long_options[] = {
363  {"data-only", no_argument, NULL, 'a'},
364  {"blobs", no_argument, NULL, 'b'},
365  {"large-objects", no_argument, NULL, 'b'},
366  {"no-blobs", no_argument, NULL, 'B'},
367  {"no-large-objects", no_argument, NULL, 'B'},
368  {"clean", no_argument, NULL, 'c'},
369  {"create", no_argument, NULL, 'C'},
370  {"dbname", required_argument, NULL, 'd'},
371  {"extension", required_argument, NULL, 'e'},
372  {"file", required_argument, NULL, 'f'},
373  {"format", required_argument, NULL, 'F'},
374  {"host", required_argument, NULL, 'h'},
375  {"jobs", 1, NULL, 'j'},
376  {"no-reconnect", no_argument, NULL, 'R'},
377  {"no-owner", no_argument, NULL, 'O'},
378  {"port", required_argument, NULL, 'p'},
379  {"schema", required_argument, NULL, 'n'},
380  {"exclude-schema", required_argument, NULL, 'N'},
381  {"schema-only", no_argument, NULL, 's'},
382  {"superuser", required_argument, NULL, 'S'},
383  {"table", required_argument, NULL, 't'},
384  {"exclude-table", required_argument, NULL, 'T'},
385  {"no-password", no_argument, NULL, 'w'},
386  {"password", no_argument, NULL, 'W'},
387  {"username", required_argument, NULL, 'U'},
388  {"verbose", no_argument, NULL, 'v'},
389  {"no-privileges", no_argument, NULL, 'x'},
390  {"no-acl", no_argument, NULL, 'x'},
391  {"compress", required_argument, NULL, 'Z'},
392  {"encoding", required_argument, NULL, 'E'},
393  {"help", no_argument, NULL, '?'},
394  {"version", no_argument, NULL, 'V'},
395 
396  /*
397  * the following options don't have an equivalent short option letter
398  */
399  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
400  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
401  {"column-inserts", no_argument, &dopt.column_inserts, 1},
402  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
403  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
404  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
405  {"exclude-table-data", required_argument, NULL, 4},
406  {"extra-float-digits", required_argument, NULL, 8},
407  {"if-exists", no_argument, &dopt.if_exists, 1},
408  {"inserts", no_argument, NULL, 9},
409  {"lock-wait-timeout", required_argument, NULL, 2},
410  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
411  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
412  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
413  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
414  {"role", required_argument, NULL, 3},
415  {"section", required_argument, NULL, 5},
416  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
417  {"snapshot", required_argument, NULL, 6},
418  {"strict-names", no_argument, &strict_names, 1},
419  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
420  {"no-comments", no_argument, &dopt.no_comments, 1},
421  {"no-publications", no_argument, &dopt.no_publications, 1},
422  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
423  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
424  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
425  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
426  {"no-sync", no_argument, NULL, 7},
427  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
428  {"rows-per-insert", required_argument, NULL, 10},
429  {"include-foreign-data", required_argument, NULL, 11},
430  {"table-and-children", required_argument, NULL, 12},
431  {"exclude-table-and-children", required_argument, NULL, 13},
432  {"exclude-table-data-and-children", required_argument, NULL, 14},
433 
434  {NULL, 0, NULL, 0}
435  };
436 
437  pg_logging_init(argv[0]);
439  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
440 
441  /*
442  * Initialize what we need for parallel execution, especially for thread
443  * support on Windows.
444  */
446 
447  progname = get_progname(argv[0]);
448 
449  if (argc > 1)
450  {
451  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
452  {
453  help(progname);
454  exit_nicely(0);
455  }
456  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
457  {
458  puts("pg_dump (PostgreSQL) " PG_VERSION);
459  exit_nicely(0);
460  }
461  }
462 
463  InitDumpOptions(&dopt);
464 
465  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
466  long_options, &optindex)) != -1)
467  {
468  switch (c)
469  {
470  case 'a': /* Dump data only */
471  dopt.dataOnly = true;
472  break;
473 
474  case 'b': /* Dump LOs */
475  dopt.outputLOs = true;
476  break;
477 
478  case 'B': /* Don't dump LOs */
479  dopt.dontOutputLOs = true;
480  break;
481 
482  case 'c': /* clean (i.e., drop) schema prior to create */
483  dopt.outputClean = 1;
484  break;
485 
486  case 'C': /* Create DB */
487  dopt.outputCreateDB = 1;
488  break;
489 
490  case 'd': /* database name */
491  dopt.cparams.dbname = pg_strdup(optarg);
492  break;
493 
494  case 'e': /* include extension(s) */
496  dopt.include_everything = false;
497  break;
498 
499  case 'E': /* Dump encoding */
500  dumpencoding = pg_strdup(optarg);
501  break;
502 
503  case 'f':
505  break;
506 
507  case 'F':
509  break;
510 
511  case 'h': /* server host */
512  dopt.cparams.pghost = pg_strdup(optarg);
513  break;
514 
515  case 'j': /* number of dump jobs */
516  if (!option_parse_int(optarg, "-j/--jobs", 1,
517  PG_MAX_JOBS,
518  &numWorkers))
519  exit_nicely(1);
520  break;
521 
522  case 'n': /* include schema(s) */
524  dopt.include_everything = false;
525  break;
526 
527  case 'N': /* exclude schema(s) */
529  break;
530 
531  case 'O': /* Don't reconnect to match owner */
532  dopt.outputNoOwner = 1;
533  break;
534 
535  case 'p': /* server port */
536  dopt.cparams.pgport = pg_strdup(optarg);
537  break;
538 
539  case 'R':
540  /* no-op, still accepted for backwards compatibility */
541  break;
542 
543  case 's': /* dump schema only */
544  dopt.schemaOnly = true;
545  break;
546 
547  case 'S': /* Username for superuser in plain text output */
549  break;
550 
551  case 't': /* include table(s) */
553  dopt.include_everything = false;
554  break;
555 
556  case 'T': /* exclude table(s) */
558  break;
559 
560  case 'U':
562  break;
563 
564  case 'v': /* verbose */
565  g_verbose = true;
567  break;
568 
569  case 'w':
571  break;
572 
573  case 'W':
575  break;
576 
577  case 'x': /* skip ACL dump */
578  dopt.aclsSkip = true;
579  break;
580 
581  case 'Z': /* Compression */
582  parse_compress_options(optarg, &compression_algorithm_str,
583  &compression_detail);
584  user_compression_defined = true;
585  break;
586 
587  case 0:
588  /* This covers the long options. */
589  break;
590 
591  case 2: /* lock-wait-timeout */
593  break;
594 
595  case 3: /* SET ROLE */
596  use_role = pg_strdup(optarg);
597  break;
598 
599  case 4: /* exclude table(s) data */
601  break;
602 
603  case 5: /* section */
605  break;
606 
607  case 6: /* snapshot */
608  dumpsnapshot = pg_strdup(optarg);
609  break;
610 
611  case 7: /* no-sync */
612  dosync = false;
613  break;
614 
615  case 8:
617  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
619  exit_nicely(1);
620  break;
621 
622  case 9: /* inserts */
623 
624  /*
625  * dump_inserts also stores --rows-per-insert, careful not to
626  * overwrite that.
627  */
628  if (dopt.dump_inserts == 0)
630  break;
631 
632  case 10: /* rows per insert */
633  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
634  &dopt.dump_inserts))
635  exit_nicely(1);
636  break;
637 
638  case 11: /* include foreign data */
640  optarg);
641  break;
642 
643  case 12: /* include table(s) and their children */
645  optarg);
646  dopt.include_everything = false;
647  break;
648 
649  case 13: /* exclude table(s) and their children */
651  optarg);
652  break;
653 
654  case 14: /* exclude data of table(s) and children */
656  optarg);
657  break;
658 
659  default:
660  /* getopt_long already emitted a complaint */
661  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
662  exit_nicely(1);
663  }
664  }
665 
666  /*
667  * Non-option argument specifies database name as long as it wasn't
668  * already specified with -d / --dbname
669  */
670  if (optind < argc && dopt.cparams.dbname == NULL)
671  dopt.cparams.dbname = argv[optind++];
672 
673  /* Complain if any arguments remain */
674  if (optind < argc)
675  {
676  pg_log_error("too many command-line arguments (first is \"%s\")",
677  argv[optind]);
678  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
679  exit_nicely(1);
680  }
681 
682  /* --column-inserts implies --inserts */
683  if (dopt.column_inserts && dopt.dump_inserts == 0)
685 
686  /*
687  * Binary upgrade mode implies dumping sequence data even in schema-only
688  * mode. This is not exposed as a separate option, but kept separate
689  * internally for clarity.
690  */
691  if (dopt.binary_upgrade)
692  dopt.sequence_data = 1;
693 
694  if (dopt.dataOnly && dopt.schemaOnly)
695  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
696 
698  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
699 
700  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
701  pg_fatal("option --include-foreign-data is not supported with parallel backup");
702 
703  if (dopt.dataOnly && dopt.outputClean)
704  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
705 
706  if (dopt.if_exists && !dopt.outputClean)
707  pg_fatal("option --if-exists requires option -c/--clean");
708 
709  /*
710  * --inserts are already implied above if --column-inserts or
711  * --rows-per-insert were specified.
712  */
713  if (dopt.do_nothing && dopt.dump_inserts == 0)
714  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
715 
716  /* Identify archive format to emit */
717  archiveFormat = parseArchiveFormat(format, &archiveMode);
718 
719  /* archiveFormat specific setup */
720  if (archiveFormat == archNull)
721  plainText = 1;
722 
723  /*
724  * Compression options
725  */
726  if (!parse_compress_algorithm(compression_algorithm_str,
728  pg_fatal("unrecognized compression algorithm: \"%s\"",
729  compression_algorithm_str);
730 
732  &compression_spec);
733  error_detail = validate_compress_specification(&compression_spec);
734  if (error_detail != NULL)
735  pg_fatal("invalid compression specification: %s",
736  error_detail);
737 
738  switch (compression_algorithm)
739  {
740  case PG_COMPRESSION_NONE:
741  /* fallthrough */
742  case PG_COMPRESSION_GZIP:
743  /* fallthrough */
744  case PG_COMPRESSION_LZ4:
745  break;
746  case PG_COMPRESSION_ZSTD:
747  pg_fatal("compression with %s is not yet supported", "ZSTD");
748  break;
749  }
750 
751  /*
752  * Custom and directory formats are compressed by default with gzip when
753  * available, not the others.
754  */
755  if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
756  !user_compression_defined)
757  {
758 #ifdef HAVE_LIBZ
760  &compression_spec);
761 #else
762  /* Nothing to do in the default case */
763 #endif
764  }
765 
766  /*
767  * If emitting an archive format, we always want to emit a DATABASE item,
768  * in case --create is specified at pg_restore time.
769  */
770  if (!plainText)
771  dopt.outputCreateDB = 1;
772 
773  /* Parallel backup only in the directory archive format so far */
774  if (archiveFormat != archDirectory && numWorkers > 1)
775  pg_fatal("parallel backup only supported by the directory format");
776 
777  /* Open the output file */
778  fout = CreateArchive(filename, archiveFormat, compression_spec,
779  dosync, archiveMode, setupDumpWorker);
780 
781  /* Make dump options accessible right away */
782  SetArchiveOptions(fout, &dopt, NULL);
783 
784  /* Register the cleanup hook */
785  on_exit_close_archive(fout);
786 
787  /* Let the archiver know how noisy to be */
788  fout->verbose = g_verbose;
789 
790 
791  /*
792  * We allow the server to be back to 9.2, and up to any minor release of
793  * our own major version. (See also version check in pg_dumpall.c.)
794  */
795  fout->minRemoteVersion = 90200;
796  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
797 
798  fout->numWorkers = numWorkers;
799 
800  /*
801  * Open the database using the Archiver, so it knows about it. Errors mean
802  * death.
803  */
804  ConnectDatabase(fout, &dopt.cparams, false);
805  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
806 
807  /*
808  * On hot standbys, never try to dump unlogged table data, since it will
809  * just throw an error.
810  */
811  if (fout->isStandby)
812  dopt.no_unlogged_table_data = true;
813 
814  /*
815  * Find the last built-in OID, if needed (prior to 8.1)
816  *
817  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
818  */
820 
821  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
822 
823  /* Expand schema selection patterns into OID lists */
824  if (schema_include_patterns.head != NULL)
825  {
828  strict_names);
829  if (schema_include_oids.head == NULL)
830  pg_fatal("no matching schemas were found");
831  }
834  false);
835  /* non-matching exclusion patterns aren't an error */
836 
837  /* Expand table selection patterns into OID lists */
840  strict_names, false);
843  strict_names, true);
844  if ((table_include_patterns.head != NULL ||
846  table_include_oids.head == NULL)
847  pg_fatal("no matching tables were found");
848 
851  false, false);
854  false, true);
855 
858  false, false);
861  false, true);
862 
865 
866  /* non-matching exclusion patterns aren't an error */
867 
868  /* Expand extension selection patterns into OID lists */
869  if (extension_include_patterns.head != NULL)
870  {
873  strict_names);
874  if (extension_include_oids.head == NULL)
875  pg_fatal("no matching extensions were found");
876  }
877 
878  /*
879  * Dumping LOs is the default for dumps where an inclusion switch is not
880  * used (an "include everything" dump). -B can be used to exclude LOs
881  * from those dumps. -b can be used to include LOs even when an
882  * inclusion switch is used.
883  *
884  * -s means "schema only" and LOs are data, not schema, so we never
885  * include LOs when -s is used.
886  */
887  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputLOs)
888  dopt.outputLOs = true;
889 
890  /*
891  * Collect role names so we can map object owner OIDs to names.
892  */
893  collectRoleNames(fout);
894 
895  /*
896  * Now scan the database and create DumpableObject structs for all the
897  * objects we intend to dump.
898  */
899  tblinfo = getSchemaData(fout, &numTables);
900 
901  if (!dopt.schemaOnly)
902  {
903  getTableData(&dopt, tblinfo, numTables, 0);
905  if (dopt.dataOnly)
907  }
908 
909  if (dopt.schemaOnly && dopt.sequence_data)
910  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
911 
912  /*
913  * In binary-upgrade mode, we do not have to worry about the actual LO
914  * data or the associated metadata that resides in the pg_largeobject and
915  * pg_largeobject_metadata tables, respectively.
916  *
917  * However, we do need to collect LO information as there may be
918  * comments or other information on LOs that we do need to dump out.
919  */
920  if (dopt.outputLOs || dopt.binary_upgrade)
921  getLOs(fout);
922 
923  /*
924  * Collect dependency data to assist in ordering the objects.
925  */
926  getDependencies(fout);
927 
928  /*
929  * Collect ACLs, comments, and security labels, if wanted.
930  */
931  if (!dopt.aclsSkip)
932  getAdditionalACLs(fout);
933  if (!dopt.no_comments)
934  collectComments(fout);
935  if (!dopt.no_security_labels)
936  collectSecLabels(fout);
937 
938  /* Lastly, create dummy objects to represent the section boundaries */
939  boundaryObjs = createBoundaryObjects();
940 
941  /* Get pointers to all the known DumpableObjects */
942  getDumpableObjects(&dobjs, &numObjs);
943 
944  /*
945  * Add dummy dependencies to enforce the dump section ordering.
946  */
947  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
948 
949  /*
950  * Sort the objects into a safe dump order (no forward references).
951  *
952  * We rely on dependency information to help us determine a safe order, so
953  * the initial sort is mostly for cosmetic purposes: we sort by name to
954  * ensure that logically identical schemas will dump identically.
955  */
956  sortDumpableObjectsByTypeName(dobjs, numObjs);
957 
958  sortDumpableObjects(dobjs, numObjs,
959  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
960 
961  /*
962  * Create archive TOC entries for all the objects to be dumped, in a safe
963  * order.
964  */
965 
966  /*
967  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
968  */
969  dumpEncoding(fout);
970  dumpStdStrings(fout);
971  dumpSearchPath(fout);
972 
973  /* The database items are always next, unless we don't want them at all */
974  if (dopt.outputCreateDB)
975  dumpDatabase(fout);
976 
977  /* Now the rearrangeable objects. */
978  for (i = 0; i < numObjs; i++)
979  dumpDumpableObject(fout, dobjs[i]);
980 
981  /*
982  * Set up options info to ensure we dump what we want.
983  */
984  ropt = NewRestoreOptions();
985  ropt->filename = filename;
986 
987  /* if you change this list, see dumpOptionsFromRestoreOptions */
988  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
989  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
990  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
991  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
993  ropt->dropSchema = dopt.outputClean;
994  ropt->dataOnly = dopt.dataOnly;
995  ropt->schemaOnly = dopt.schemaOnly;
996  ropt->if_exists = dopt.if_exists;
997  ropt->column_inserts = dopt.column_inserts;
998  ropt->dumpSections = dopt.dumpSections;
999  ropt->aclsSkip = dopt.aclsSkip;
1000  ropt->superuser = dopt.outputSuperuser;
1001  ropt->createDB = dopt.outputCreateDB;
1002  ropt->noOwner = dopt.outputNoOwner;
1003  ropt->noTableAm = dopt.outputNoTableAm;
1004  ropt->noTablespace = dopt.outputNoTablespaces;
1005  ropt->disable_triggers = dopt.disable_triggers;
1006  ropt->use_setsessauth = dopt.use_setsessauth;
1008  ropt->dump_inserts = dopt.dump_inserts;
1009  ropt->no_comments = dopt.no_comments;
1010  ropt->no_publications = dopt.no_publications;
1012  ropt->no_subscriptions = dopt.no_subscriptions;
1013  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
1016  ropt->sequence_data = dopt.sequence_data;
1017  ropt->binary_upgrade = dopt.binary_upgrade;
1018 
1019  ropt->compression_spec = compression_spec;
1020 
1021  ropt->suppressDumpWarnings = true; /* We've already shown them */
1022 
1023  SetArchiveOptions(fout, &dopt, ropt);
1024 
1025  /* Mark which entries should be output */
1027 
1028  /*
1029  * The archive's TOC entries are now marked as to which ones will actually
1030  * be output, so we can set up their dependency lists properly. This isn't
1031  * necessary for plain-text output, though.
1032  */
1033  if (!plainText)
1035 
1036  /*
1037  * And finally we can do the actual output.
1038  *
1039  * Note: for non-plain-text output formats, the output file is written
1040  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1041  * right now.
1042  */
1043  if (plainText)
1044  RestoreArchive(fout);
1045 
1046  CloseArchive(fout);
1047 
1048  exit_nicely(0);
1049 }
1050 
1051 
1052 static void
1053 help(const char *progname)
1054 {
1055  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1056  printf(_("Usage:\n"));
1057  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1058 
1059  printf(_("\nGeneral options:\n"));
1060  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1061  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1062  " plain text (default))\n"));
1063  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1064  printf(_(" -v, --verbose verbose mode\n"));
1065  printf(_(" -V, --version output version information, then exit\n"));
1066  printf(_(" -Z, --compress=METHOD[:LEVEL]\n"
1067  " compress as specified\n"));
1068  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1069  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1070  printf(_(" -?, --help show this help, then exit\n"));
1071 
1072  printf(_("\nOptions controlling the output content:\n"));
1073  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1074  printf(_(" -b, --large-objects include large objects in dump\n"
1075  " --blobs (same as --large-objects, deprecated)\n"));
1076  printf(_(" -B, --no-large-objects exclude large objects in dump\n"
1077  " --no-blobs (same as --no-large-objects, deprecated)\n"));
1078  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1079  printf(_(" -C, --create include commands to create database in dump\n"));
1080  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1081  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1082  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1083  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1084  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1085  " plain-text format\n"));
1086  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1087  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1088  printf(_(" -t, --table=PATTERN dump only the specified table(s)\n"));
1089  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1090  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1091  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1092  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1093  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1094  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1095  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1096  " access to)\n"));
1097  printf(_(" --exclude-table-and-children=PATTERN\n"
1098  " do NOT dump the specified table(s),\n"
1099  " including child and partition tables\n"));
1100  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1101  printf(_(" --exclude-table-data-and-children=PATTERN\n"
1102  " do NOT dump data for the specified table(s),\n"
1103  " including child and partition tables\n"));
1104  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1105  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1106  printf(_(" --include-foreign-data=PATTERN\n"
1107  " include data of foreign tables on foreign\n"
1108  " servers matching PATTERN\n"));
1109  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1110  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1111  printf(_(" --no-comments do not dump comments\n"));
1112  printf(_(" --no-publications do not dump publications\n"));
1113  printf(_(" --no-security-labels do not dump security label assignments\n"));
1114  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1115  printf(_(" --no-table-access-method do not dump table access methods\n"));
1116  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1117  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1118  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1119  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1120  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1121  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1122  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1123  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1124  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1125  printf(_(" --strict-names require table and/or schema include patterns to\n"
1126  " match at least one entity each\n"));
1127  printf(_(" --table-and-children=PATTERN dump only the specified table(s),\n"
1128  " including child and partition tables\n"));
1129  printf(_(" --use-set-session-authorization\n"
1130  " use SET SESSION AUTHORIZATION commands instead of\n"
1131  " ALTER OWNER commands to set ownership\n"));
1132 
1133  printf(_("\nConnection options:\n"));
1134  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1135  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1136  printf(_(" -p, --port=PORT database server port number\n"));
1137  printf(_(" -U, --username=NAME connect as specified database user\n"));
1138  printf(_(" -w, --no-password never prompt for password\n"));
1139  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1140  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1141 
1142  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1143  "variable value is used.\n\n"));
1144  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1145  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1146 }
1147 
1148 static void
1149 setup_connection(Archive *AH, const char *dumpencoding,
1150  const char *dumpsnapshot, char *use_role)
1151 {
1152  DumpOptions *dopt = AH->dopt;
1153  PGconn *conn = GetConnection(AH);
1154  const char *std_strings;
1155 
1157 
1158  /*
1159  * Set the client encoding if requested.
1160  */
1161  if (dumpencoding)
1162  {
1163  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1164  pg_fatal("invalid client encoding \"%s\" specified",
1165  dumpencoding);
1166  }
1167 
1168  /*
1169  * Get the active encoding and the standard_conforming_strings setting, so
1170  * we know how to escape strings.
1171  */
1173 
1174  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1175  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1176 
1177  /*
1178  * Set the role if requested. In a parallel dump worker, we'll be passed
1179  * use_role == NULL, but AH->use_role is already set (if user specified it
1180  * originally) and we should use that.
1181  */
1182  if (!use_role && AH->use_role)
1183  use_role = AH->use_role;
1184 
1185  /* Set the role if requested */
1186  if (use_role)
1187  {
1188  PQExpBuffer query = createPQExpBuffer();
1189 
1190  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1191  ExecuteSqlStatement(AH, query->data);
1192  destroyPQExpBuffer(query);
1193 
1194  /* save it for possible later use by parallel workers */
1195  if (!AH->use_role)
1196  AH->use_role = pg_strdup(use_role);
1197  }
1198 
1199  /* Set the datestyle to ISO to ensure the dump's portability */
1200  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1201 
1202  /* Likewise, avoid using sql_standard intervalstyle */
1203  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1204 
1205  /*
1206  * Use an explicitly specified extra_float_digits if it has been provided.
1207  * Otherwise, set extra_float_digits so that we can dump float data
1208  * exactly (given correctly implemented float I/O code, anyway).
1209  */
1211  {
1213 
1214  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1216  ExecuteSqlStatement(AH, q->data);
1217  destroyPQExpBuffer(q);
1218  }
1219  else
1220  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1221 
1222  /*
1223  * Disable synchronized scanning, to prevent unpredictable changes in row
1224  * ordering across a dump and reload.
1225  */
1226  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1227 
1228  /*
1229  * Disable timeouts if supported.
1230  */
1231  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1232  if (AH->remoteVersion >= 90300)
1233  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1234  if (AH->remoteVersion >= 90600)
1235  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1236 
1237  /*
1238  * Quote all identifiers, if requested.
1239  */
1241  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1242 
1243  /*
1244  * Adjust row-security mode, if supported.
1245  */
1246  if (AH->remoteVersion >= 90500)
1247  {
1248  if (dopt->enable_row_security)
1249  ExecuteSqlStatement(AH, "SET row_security = on");
1250  else
1251  ExecuteSqlStatement(AH, "SET row_security = off");
1252  }
1253 
1254  /*
1255  * Initialize prepared-query state to "nothing prepared". We do this here
1256  * so that a parallel dump worker will have its own state.
1257  */
1258  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1259 
1260  /*
1261  * Start transaction-snapshot mode transaction to dump consistent data.
1262  */
1263  ExecuteSqlStatement(AH, "BEGIN");
1264 
1265  /*
1266  * To support the combination of serializable_deferrable with the jobs
1267  * option we use REPEATABLE READ for the worker connections that are
1268  * passed a snapshot. As long as the snapshot is acquired in a
1269  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1270  * REPEATABLE READ transaction provides the appropriate integrity
1271  * guarantees. This is a kluge, but safe for back-patching.
1272  */
1273  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1275  "SET TRANSACTION ISOLATION LEVEL "
1276  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1277  else
1279  "SET TRANSACTION ISOLATION LEVEL "
1280  "REPEATABLE READ, READ ONLY");
1281 
1282  /*
1283  * If user specified a snapshot to use, select that. In a parallel dump
1284  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1285  * is already set (if the server can handle it) and we should use that.
1286  */
1287  if (dumpsnapshot)
1288  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1289 
1290  if (AH->sync_snapshot_id)
1291  {
1292  PQExpBuffer query = createPQExpBuffer();
1293 
1294  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1296  ExecuteSqlStatement(AH, query->data);
1297  destroyPQExpBuffer(query);
1298  }
1299  else if (AH->numWorkers > 1)
1300  {
1301  if (AH->isStandby && AH->remoteVersion < 100000)
1302  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1304  }
1305 }
1306 
1307 /* Set up connection for a parallel worker process */
1308 static void
1310 {
1311  /*
1312  * We want to re-select all the same values the leader connection is
1313  * using. We'll have inherited directly-usable values in
1314  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1315  * inherited encoding value back to a string to pass to setup_connection.
1316  */
1317  setup_connection(AH,
1319  NULL,
1320  NULL);
1321 }
1322 
1323 static char *
1325 {
1326  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1327  char *result;
1328  PGresult *res;
1329 
1330  res = ExecuteSqlQueryForSingleRow(fout, query);
1331  result = pg_strdup(PQgetvalue(res, 0, 0));
1332  PQclear(res);
1333 
1334  return result;
1335 }
1336 
1337 static ArchiveFormat
1339 {
1340  ArchiveFormat archiveFormat;
1341 
1342  *mode = archModeWrite;
1343 
1344  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1345  {
1346  /* This is used by pg_dumpall, and is not documented */
1347  archiveFormat = archNull;
1348  *mode = archModeAppend;
1349  }
1350  else if (pg_strcasecmp(format, "c") == 0)
1351  archiveFormat = archCustom;
1352  else if (pg_strcasecmp(format, "custom") == 0)
1353  archiveFormat = archCustom;
1354  else if (pg_strcasecmp(format, "d") == 0)
1355  archiveFormat = archDirectory;
1356  else if (pg_strcasecmp(format, "directory") == 0)
1357  archiveFormat = archDirectory;
1358  else if (pg_strcasecmp(format, "p") == 0)
1359  archiveFormat = archNull;
1360  else if (pg_strcasecmp(format, "plain") == 0)
1361  archiveFormat = archNull;
1362  else if (pg_strcasecmp(format, "t") == 0)
1363  archiveFormat = archTar;
1364  else if (pg_strcasecmp(format, "tar") == 0)
1365  archiveFormat = archTar;
1366  else
1367  pg_fatal("invalid output format \"%s\" specified", format);
1368  return archiveFormat;
1369 }
1370 
1371 /*
1372  * Find the OIDs of all schemas matching the given list of patterns,
1373  * and append them to the given OID list.
1374  */
1375 static void
1377  SimpleStringList *patterns,
1378  SimpleOidList *oids,
1379  bool strict_names)
1380 {
1381  PQExpBuffer query;
1382  PGresult *res;
1383  SimpleStringListCell *cell;
1384  int i;
1385 
1386  if (patterns->head == NULL)
1387  return; /* nothing to do */
1388 
1389  query = createPQExpBuffer();
1390 
1391  /*
1392  * The loop below runs multiple SELECTs might sometimes result in
1393  * duplicate entries in the OID list, but we don't care.
1394  */
1395 
1396  for (cell = patterns->head; cell; cell = cell->next)
1397  {
1398  PQExpBufferData dbbuf;
1399  int dotcnt;
1400 
1401  appendPQExpBufferStr(query,
1402  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1403  initPQExpBuffer(&dbbuf);
1404  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1405  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1406  &dotcnt);
1407  if (dotcnt > 1)
1408  pg_fatal("improper qualified name (too many dotted names): %s",
1409  cell->val);
1410  else if (dotcnt == 1)
1411  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1412  termPQExpBuffer(&dbbuf);
1413 
1414  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1415  if (strict_names && PQntuples(res) == 0)
1416  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1417 
1418  for (i = 0; i < PQntuples(res); i++)
1419  {
1421  }
1422 
1423  PQclear(res);
1424  resetPQExpBuffer(query);
1425  }
1426 
1427  destroyPQExpBuffer(query);
1428 }
1429 
1430 /*
1431  * Find the OIDs of all extensions matching the given list of patterns,
1432  * and append them to the given OID list.
1433  */
1434 static void
1436  SimpleStringList *patterns,
1437  SimpleOidList *oids,
1438  bool strict_names)
1439 {
1440  PQExpBuffer query;
1441  PGresult *res;
1442  SimpleStringListCell *cell;
1443  int i;
1444 
1445  if (patterns->head == NULL)
1446  return; /* nothing to do */
1447 
1448  query = createPQExpBuffer();
1449 
1450  /*
1451  * The loop below runs multiple SELECTs might sometimes result in
1452  * duplicate entries in the OID list, but we don't care.
1453  */
1454  for (cell = patterns->head; cell; cell = cell->next)
1455  {
1456  int dotcnt;
1457 
1458  appendPQExpBufferStr(query,
1459  "SELECT oid FROM pg_catalog.pg_extension e\n");
1460  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1461  false, NULL, "e.extname", NULL, NULL, NULL,
1462  &dotcnt);
1463  if (dotcnt > 0)
1464  pg_fatal("improper qualified name (too many dotted names): %s",
1465  cell->val);
1466 
1467  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1468  if (strict_names && PQntuples(res) == 0)
1469  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1470 
1471  for (i = 0; i < PQntuples(res); i++)
1472  {
1474  }
1475 
1476  PQclear(res);
1477  resetPQExpBuffer(query);
1478  }
1479 
1480  destroyPQExpBuffer(query);
1481 }
1482 
1483 /*
1484  * Find the OIDs of all foreign servers matching the given list of patterns,
1485  * and append them to the given OID list.
1486  */
1487 static void
1489  SimpleStringList *patterns,
1490  SimpleOidList *oids)
1491 {
1492  PQExpBuffer query;
1493  PGresult *res;
1494  SimpleStringListCell *cell;
1495  int i;
1496 
1497  if (patterns->head == NULL)
1498  return; /* nothing to do */
1499 
1500  query = createPQExpBuffer();
1501 
1502  /*
1503  * The loop below runs multiple SELECTs might sometimes result in
1504  * duplicate entries in the OID list, but we don't care.
1505  */
1506 
1507  for (cell = patterns->head; cell; cell = cell->next)
1508  {
1509  int dotcnt;
1510 
1511  appendPQExpBufferStr(query,
1512  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1513  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1514  false, NULL, "s.srvname", NULL, NULL, NULL,
1515  &dotcnt);
1516  if (dotcnt > 0)
1517  pg_fatal("improper qualified name (too many dotted names): %s",
1518  cell->val);
1519 
1520  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1521  if (PQntuples(res) == 0)
1522  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1523 
1524  for (i = 0; i < PQntuples(res); i++)
1526 
1527  PQclear(res);
1528  resetPQExpBuffer(query);
1529  }
1530 
1531  destroyPQExpBuffer(query);
1532 }
1533 
1534 /*
1535  * Find the OIDs of all tables matching the given list of patterns,
1536  * and append them to the given OID list. See also expand_dbname_patterns()
1537  * in pg_dumpall.c
1538  */
1539 static void
1541  SimpleStringList *patterns, SimpleOidList *oids,
1542  bool strict_names, bool with_child_tables)
1543 {
1544  PQExpBuffer query;
1545  PGresult *res;
1546  SimpleStringListCell *cell;
1547  int i;
1548 
1549  if (patterns->head == NULL)
1550  return; /* nothing to do */
1551 
1552  query = createPQExpBuffer();
1553 
1554  /*
1555  * this might sometimes result in duplicate entries in the OID list, but
1556  * we don't care.
1557  */
1558 
1559  for (cell = patterns->head; cell; cell = cell->next)
1560  {
1561  PQExpBufferData dbbuf;
1562  int dotcnt;
1563 
1564  /*
1565  * Query must remain ABSOLUTELY devoid of unqualified names. This
1566  * would be unnecessary given a pg_table_is_visible() variant taking a
1567  * search_path argument.
1568  *
1569  * For with_child_tables, we start with the basic query's results and
1570  * recursively search the inheritance tree to add child tables.
1571  */
1572  if (with_child_tables)
1573  {
1574  appendPQExpBuffer(query, "WITH RECURSIVE partition_tree (relid) AS (\n");
1575  }
1576 
1577  appendPQExpBuffer(query,
1578  "SELECT c.oid"
1579  "\nFROM pg_catalog.pg_class c"
1580  "\n LEFT JOIN pg_catalog.pg_namespace n"
1581  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1582  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1583  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1584  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1585  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1586  RELKIND_PARTITIONED_TABLE);
1587  initPQExpBuffer(&dbbuf);
1588  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1589  false, "n.nspname", "c.relname", NULL,
1590  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1591  &dotcnt);
1592  if (dotcnt > 2)
1593  pg_fatal("improper relation name (too many dotted names): %s",
1594  cell->val);
1595  else if (dotcnt == 2)
1596  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1597  termPQExpBuffer(&dbbuf);
1598 
1599  if (with_child_tables)
1600  {
1601  appendPQExpBuffer(query, "UNION"
1602  "\nSELECT i.inhrelid"
1603  "\nFROM partition_tree p"
1604  "\n JOIN pg_catalog.pg_inherits i"
1605  "\n ON p.relid OPERATOR(pg_catalog.=) i.inhparent"
1606  "\n)"
1607  "\nSELECT relid FROM partition_tree");
1608  }
1609 
1610  ExecuteSqlStatement(fout, "RESET search_path");
1611  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1614  if (strict_names && PQntuples(res) == 0)
1615  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1616 
1617  for (i = 0; i < PQntuples(res); i++)
1618  {
1620  }
1621 
1622  PQclear(res);
1623  resetPQExpBuffer(query);
1624  }
1625 
1626  destroyPQExpBuffer(query);
1627 }
1628 
1629 /*
1630  * Verifies that the connected database name matches the given database name,
1631  * and if not, dies with an error about the given pattern.
1632  *
1633  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1634  */
1635 static void
1636 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1637 {
1638  const char *db;
1639 
1640  db = PQdb(conn);
1641  if (db == NULL)
1642  pg_fatal("You are currently not connected to a database.");
1643 
1644  if (strcmp(db, dbname) != 0)
1645  pg_fatal("cross-database references are not implemented: %s",
1646  pattern);
1647 }
1648 
1649 /*
1650  * checkExtensionMembership
1651  * Determine whether object is an extension member, and if so,
1652  * record an appropriate dependency and set the object's dump flag.
1653  *
1654  * It's important to call this for each object that could be an extension
1655  * member. Generally, we integrate this with determining the object's
1656  * to-be-dumped-ness, since extension membership overrides other rules for that.
1657  *
1658  * Returns true if object is an extension member, else false.
1659  */
1660 static bool
1662 {
1663  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1664 
1665  if (ext == NULL)
1666  return false;
1667 
1668  dobj->ext_member = true;
1669 
1670  /* Record dependency so that getDependencies needn't deal with that */
1671  addObjectDependency(dobj, ext->dobj.dumpId);
1672 
1673  /*
1674  * In 9.6 and above, mark the member object to have any non-initial ACL,
1675  * policies, and security labels dumped.
1676  *
1677  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1678  * extract the information about the object. We don't provide support for
1679  * initial policies and security labels and it seems unlikely for those to
1680  * ever exist, but we may have to revisit this later.
1681  *
1682  * Prior to 9.6, we do not include any extension member components.
1683  *
1684  * In binary upgrades, we still dump all components of the members
1685  * individually, since the idea is to exactly reproduce the database
1686  * contents rather than replace the extension contents with something
1687  * different.
1688  */
1689  if (fout->dopt->binary_upgrade)
1690  dobj->dump = ext->dobj.dump;
1691  else
1692  {
1693  if (fout->remoteVersion < 90600)
1694  dobj->dump = DUMP_COMPONENT_NONE;
1695  else
1696  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1699  }
1700 
1701  return true;
1702 }
1703 
1704 /*
1705  * selectDumpableNamespace: policy-setting subroutine
1706  * Mark a namespace as to be dumped or not
1707  */
1708 static void
1710 {
1711  /*
1712  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1713  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1714  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1715  */
1716  nsinfo->create = true;
1717 
1718  /*
1719  * If specific tables are being dumped, do not dump any complete
1720  * namespaces. If specific namespaces are being dumped, dump just those
1721  * namespaces. Otherwise, dump all non-system namespaces.
1722  */
1723  if (table_include_oids.head != NULL)
1724  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1725  else if (schema_include_oids.head != NULL)
1726  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1728  nsinfo->dobj.catId.oid) ?
1730  else if (fout->remoteVersion >= 90600 &&
1731  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1732  {
1733  /*
1734  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1735  * they are interesting (and not the original ACLs which were set at
1736  * initdb time, see pg_init_privs).
1737  */
1738  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1739  }
1740  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1741  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1742  {
1743  /* Other system schemas don't get dumped */
1744  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1745  }
1746  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1747  {
1748  /*
1749  * The public schema is a strange beast that sits in a sort of
1750  * no-mans-land between being a system object and a user object.
1751  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1752  * a comment and an indication of ownership. If the owner is the
1753  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1754  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1755  */
1756  nsinfo->create = false;
1757  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1758  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1759  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1761 
1762  /*
1763  * Also, make like it has a comment even if it doesn't; this is so
1764  * that we'll emit a command to drop the comment, if appropriate.
1765  * (Without this, we'd not call dumpCommentExtended for it.)
1766  */
1768  }
1769  else
1770  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1771 
1772  /*
1773  * In any case, a namespace can be excluded by an exclusion switch
1774  */
1775  if (nsinfo->dobj.dump_contains &&
1777  nsinfo->dobj.catId.oid))
1778  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1779 
1780  /*
1781  * If the schema belongs to an extension, allow extension membership to
1782  * override the dump decision for the schema itself. However, this does
1783  * not change dump_contains, so this won't change what we do with objects
1784  * within the schema. (If they belong to the extension, they'll get
1785  * suppressed by it, otherwise not.)
1786  */
1787  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1788 }
1789 
1790 /*
1791  * selectDumpableTable: policy-setting subroutine
1792  * Mark a table as to be dumped or not
1793  */
1794 static void
1796 {
1797  if (checkExtensionMembership(&tbinfo->dobj, fout))
1798  return; /* extension membership overrides all else */
1799 
1800  /*
1801  * If specific tables are being dumped, dump just those tables; else, dump
1802  * according to the parent namespace's dump flag.
1803  */
1804  if (table_include_oids.head != NULL)
1806  tbinfo->dobj.catId.oid) ?
1808  else
1809  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1810 
1811  /*
1812  * In any case, a table can be excluded by an exclusion switch
1813  */
1814  if (tbinfo->dobj.dump &&
1816  tbinfo->dobj.catId.oid))
1817  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1818 }
1819 
1820 /*
1821  * selectDumpableType: policy-setting subroutine
1822  * Mark a type as to be dumped or not
1823  *
1824  * If it's a table's rowtype or an autogenerated array type, we also apply a
1825  * special type code to facilitate sorting into the desired order. (We don't
1826  * want to consider those to be ordinary types because that would bring tables
1827  * up into the datatype part of the dump order.) We still set the object's
1828  * dump flag; that's not going to cause the dummy type to be dumped, but we
1829  * need it so that casts involving such types will be dumped correctly -- see
1830  * dumpCast. This means the flag should be set the same as for the underlying
1831  * object (the table or base type).
1832  */
1833 static void
1835 {
1836  /* skip complex types, except for standalone composite types */
1837  if (OidIsValid(tyinfo->typrelid) &&
1838  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1839  {
1840  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1841 
1842  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1843  if (tytable != NULL)
1844  tyinfo->dobj.dump = tytable->dobj.dump;
1845  else
1846  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1847  return;
1848  }
1849 
1850  /* skip auto-generated array types */
1851  if (tyinfo->isArray || tyinfo->isMultirange)
1852  {
1853  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1854 
1855  /*
1856  * Fall through to set the dump flag; we assume that the subsequent
1857  * rules will do the same thing as they would for the array's base
1858  * type. (We cannot reliably look up the base type here, since
1859  * getTypes may not have processed it yet.)
1860  */
1861  }
1862 
1863  if (checkExtensionMembership(&tyinfo->dobj, fout))
1864  return; /* extension membership overrides all else */
1865 
1866  /* Dump based on if the contents of the namespace are being dumped */
1867  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1868 }
1869 
1870 /*
1871  * selectDumpableDefaultACL: policy-setting subroutine
1872  * Mark a default ACL as to be dumped or not
1873  *
1874  * For per-schema default ACLs, dump if the schema is to be dumped.
1875  * Otherwise dump if we are dumping "everything". Note that dataOnly
1876  * and aclsSkip are checked separately.
1877  */
1878 static void
1880 {
1881  /* Default ACLs can't be extension members */
1882 
1883  if (dinfo->dobj.namespace)
1884  /* default ACLs are considered part of the namespace */
1885  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1886  else
1887  dinfo->dobj.dump = dopt->include_everything ?
1889 }
1890 
1891 /*
1892  * selectDumpableCast: policy-setting subroutine
1893  * Mark a cast as to be dumped or not
1894  *
1895  * Casts do not belong to any particular namespace (since they haven't got
1896  * names), nor do they have identifiable owners. To distinguish user-defined
1897  * casts from built-in ones, we must resort to checking whether the cast's
1898  * OID is in the range reserved for initdb.
1899  */
1900 static void
1902 {
1903  if (checkExtensionMembership(&cast->dobj, fout))
1904  return; /* extension membership overrides all else */
1905 
1906  /*
1907  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1908  * support ACLs currently.
1909  */
1910  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1911  cast->dobj.dump = DUMP_COMPONENT_NONE;
1912  else
1913  cast->dobj.dump = fout->dopt->include_everything ?
1915 }
1916 
1917 /*
1918  * selectDumpableProcLang: policy-setting subroutine
1919  * Mark a procedural language as to be dumped or not
1920  *
1921  * Procedural languages do not belong to any particular namespace. To
1922  * identify built-in languages, we must resort to checking whether the
1923  * language's OID is in the range reserved for initdb.
1924  */
1925 static void
1927 {
1928  if (checkExtensionMembership(&plang->dobj, fout))
1929  return; /* extension membership overrides all else */
1930 
1931  /*
1932  * Only include procedural languages when we are dumping everything.
1933  *
1934  * For from-initdb procedural languages, only include ACLs, as we do for
1935  * the pg_catalog namespace. We need this because procedural languages do
1936  * not live in any namespace.
1937  */
1938  if (!fout->dopt->include_everything)
1939  plang->dobj.dump = DUMP_COMPONENT_NONE;
1940  else
1941  {
1942  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1943  plang->dobj.dump = fout->remoteVersion < 90600 ?
1945  else
1946  plang->dobj.dump = DUMP_COMPONENT_ALL;
1947  }
1948 }
1949 
1950 /*
1951  * selectDumpableAccessMethod: policy-setting subroutine
1952  * Mark an access method as to be dumped or not
1953  *
1954  * Access methods do not belong to any particular namespace. To identify
1955  * built-in access methods, we must resort to checking whether the
1956  * method's OID is in the range reserved for initdb.
1957  */
1958 static void
1960 {
1961  if (checkExtensionMembership(&method->dobj, fout))
1962  return; /* extension membership overrides all else */
1963 
1964  /*
1965  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1966  * they do not support ACLs currently.
1967  */
1968  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1969  method->dobj.dump = DUMP_COMPONENT_NONE;
1970  else
1971  method->dobj.dump = fout->dopt->include_everything ?
1973 }
1974 
1975 /*
1976  * selectDumpableExtension: policy-setting subroutine
1977  * Mark an extension as to be dumped or not
1978  *
1979  * Built-in extensions should be skipped except for checking ACLs, since we
1980  * assume those will already be installed in the target database. We identify
1981  * such extensions by their having OIDs in the range reserved for initdb.
1982  * We dump all user-added extensions by default. No extensions are dumped
1983  * if include_everything is false (i.e., a --schema or --table switch was
1984  * given), except if --extension specifies a list of extensions to dump.
1985  */
1986 static void
1988 {
1989  /*
1990  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
1991  * change permissions on their member objects, if they wish to, and have
1992  * those changes preserved.
1993  */
1994  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1995  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
1996  else
1997  {
1998  /* check if there is a list of extensions to dump */
1999  if (extension_include_oids.head != NULL)
2000  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2002  extinfo->dobj.catId.oid) ?
2004  else
2005  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2006  dopt->include_everything ?
2008  }
2009 }
2010 
2011 /*
2012  * selectDumpablePublicationObject: policy-setting subroutine
2013  * Mark a publication object as to be dumped or not
2014  *
2015  * A publication can have schemas and tables which have schemas, but those are
2016  * ignored in decision making, because publications are only dumped when we are
2017  * dumping everything.
2018  */
2019 static void
2021 {
2022  if (checkExtensionMembership(dobj, fout))
2023  return; /* extension membership overrides all else */
2024 
2025  dobj->dump = fout->dopt->include_everything ?
2027 }
2028 
2029 /*
2030  * selectDumpableObject: policy-setting subroutine
2031  * Mark a generic dumpable object as to be dumped or not
2032  *
2033  * Use this only for object types without a special-case routine above.
2034  */
2035 static void
2037 {
2038  if (checkExtensionMembership(dobj, fout))
2039  return; /* extension membership overrides all else */
2040 
2041  /*
2042  * Default policy is to dump if parent namespace is dumpable, or for
2043  * non-namespace-associated items, dump if we're dumping "everything".
2044  */
2045  if (dobj->namespace)
2046  dobj->dump = dobj->namespace->dobj.dump_contains;
2047  else
2048  dobj->dump = fout->dopt->include_everything ?
2050 }
2051 
2052 /*
2053  * Dump a table's contents for loading using the COPY command
2054  * - this routine is called by the Archiver when it wants the table
2055  * to be dumped.
2056  */
2057 static int
2058 dumpTableData_copy(Archive *fout, const void *dcontext)
2059 {
2060  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2061  TableInfo *tbinfo = tdinfo->tdtable;
2062  const char *classname = tbinfo->dobj.name;
2064 
2065  /*
2066  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2067  * which uses it already.
2068  */
2069  PQExpBuffer clistBuf = createPQExpBuffer();
2070  PGconn *conn = GetConnection(fout);
2071  PGresult *res;
2072  int ret;
2073  char *copybuf;
2074  const char *column_list;
2075 
2076  pg_log_info("dumping contents of table \"%s.%s\"",
2077  tbinfo->dobj.namespace->dobj.name, classname);
2078 
2079  /*
2080  * Specify the column list explicitly so that we have no possibility of
2081  * retrieving data in the wrong column order. (The default column
2082  * ordering of COPY will not be what we want in certain corner cases
2083  * involving ADD COLUMN and inheritance.)
2084  */
2085  column_list = fmtCopyColumnList(tbinfo, clistBuf);
2086 
2087  /*
2088  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2089  * a filter condition was specified. For other cases a simple COPY
2090  * suffices.
2091  */
2092  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2093  {
2094  appendPQExpBufferStr(q, "COPY (SELECT ");
2095  /* klugery to get rid of parens in column list */
2096  if (strlen(column_list) > 2)
2097  {
2098  appendPQExpBufferStr(q, column_list + 1);
2099  q->data[q->len - 1] = ' ';
2100  }
2101  else
2102  appendPQExpBufferStr(q, "* ");
2103 
2104  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2105  fmtQualifiedDumpable(tbinfo),
2106  tdinfo->filtercond ? tdinfo->filtercond : "");
2107  }
2108  else
2109  {
2110  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2111  fmtQualifiedDumpable(tbinfo),
2112  column_list);
2113  }
2114  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2115  PQclear(res);
2116  destroyPQExpBuffer(clistBuf);
2117 
2118  for (;;)
2119  {
2120  ret = PQgetCopyData(conn, &copybuf, 0);
2121 
2122  if (ret < 0)
2123  break; /* done or error */
2124 
2125  if (copybuf)
2126  {
2127  WriteData(fout, copybuf, ret);
2128  PQfreemem(copybuf);
2129  }
2130 
2131  /* ----------
2132  * THROTTLE:
2133  *
2134  * There was considerable discussion in late July, 2000 regarding
2135  * slowing down pg_dump when backing up large tables. Users with both
2136  * slow & fast (multi-processor) machines experienced performance
2137  * degradation when doing a backup.
2138  *
2139  * Initial attempts based on sleeping for a number of ms for each ms
2140  * of work were deemed too complex, then a simple 'sleep in each loop'
2141  * implementation was suggested. The latter failed because the loop
2142  * was too tight. Finally, the following was implemented:
2143  *
2144  * If throttle is non-zero, then
2145  * See how long since the last sleep.
2146  * Work out how long to sleep (based on ratio).
2147  * If sleep is more than 100ms, then
2148  * sleep
2149  * reset timer
2150  * EndIf
2151  * EndIf
2152  *
2153  * where the throttle value was the number of ms to sleep per ms of
2154  * work. The calculation was done in each loop.
2155  *
2156  * Most of the hard work is done in the backend, and this solution
2157  * still did not work particularly well: on slow machines, the ratio
2158  * was 50:1, and on medium paced machines, 1:1, and on fast
2159  * multi-processor machines, it had little or no effect, for reasons
2160  * that were unclear.
2161  *
2162  * Further discussion ensued, and the proposal was dropped.
2163  *
2164  * For those people who want this feature, it can be implemented using
2165  * gettimeofday in each loop, calculating the time since last sleep,
2166  * multiplying that by the sleep ratio, then if the result is more
2167  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2168  * function to sleep for a subsecond period ie.
2169  *
2170  * select(0, NULL, NULL, NULL, &tvi);
2171  *
2172  * This will return after the interval specified in the structure tvi.
2173  * Finally, call gettimeofday again to save the 'last sleep time'.
2174  * ----------
2175  */
2176  }
2177  archprintf(fout, "\\.\n\n\n");
2178 
2179  if (ret == -2)
2180  {
2181  /* copy data transfer failed */
2182  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2183  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2184  pg_log_error_detail("Command was: %s", q->data);
2185  exit_nicely(1);
2186  }
2187 
2188  /* Check command status and return to normal libpq state */
2189  res = PQgetResult(conn);
2191  {
2192  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2193  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2194  pg_log_error_detail("Command was: %s", q->data);
2195  exit_nicely(1);
2196  }
2197  PQclear(res);
2198 
2199  /* Do this to ensure we've pumped libpq back to idle state */
2200  if (PQgetResult(conn) != NULL)
2201  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2202  classname);
2203 
2204  destroyPQExpBuffer(q);
2205  return 1;
2206 }
2207 
2208 /*
2209  * Dump table data using INSERT commands.
2210  *
2211  * Caution: when we restore from an archive file direct to database, the
2212  * INSERT commands emitted by this function have to be parsed by
2213  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2214  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2215  */
2216 static int
2217 dumpTableData_insert(Archive *fout, const void *dcontext)
2218 {
2219  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2220  TableInfo *tbinfo = tdinfo->tdtable;
2221  DumpOptions *dopt = fout->dopt;
2223  PQExpBuffer insertStmt = NULL;
2224  char *attgenerated;
2225  PGresult *res;
2226  int nfields,
2227  i;
2228  int rows_per_statement = dopt->dump_inserts;
2229  int rows_this_statement = 0;
2230 
2231  /*
2232  * If we're going to emit INSERTs with column names, the most efficient
2233  * way to deal with generated columns is to exclude them entirely. For
2234  * INSERTs without column names, we have to emit DEFAULT rather than the
2235  * actual column value --- but we can save a few cycles by fetching nulls
2236  * rather than the uninteresting-to-us value.
2237  */
2238  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2239  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2240  nfields = 0;
2241  for (i = 0; i < tbinfo->numatts; i++)
2242  {
2243  if (tbinfo->attisdropped[i])
2244  continue;
2245  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2246  continue;
2247  if (nfields > 0)
2248  appendPQExpBufferStr(q, ", ");
2249  if (tbinfo->attgenerated[i])
2250  appendPQExpBufferStr(q, "NULL");
2251  else
2252  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2253  attgenerated[nfields] = tbinfo->attgenerated[i];
2254  nfields++;
2255  }
2256  /* Servers before 9.4 will complain about zero-column SELECT */
2257  if (nfields == 0)
2258  appendPQExpBufferStr(q, "NULL");
2259  appendPQExpBuffer(q, " FROM ONLY %s",
2260  fmtQualifiedDumpable(tbinfo));
2261  if (tdinfo->filtercond)
2262  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2263 
2264  ExecuteSqlStatement(fout, q->data);
2265 
2266  while (1)
2267  {
2268  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2269  PGRES_TUPLES_OK);
2270 
2271  /* cross-check field count, allowing for dummy NULL if any */
2272  if (nfields != PQnfields(res) &&
2273  !(nfields == 0 && PQnfields(res) == 1))
2274  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2275  tbinfo->dobj.name);
2276 
2277  /*
2278  * First time through, we build as much of the INSERT statement as
2279  * possible in "insertStmt", which we can then just print for each
2280  * statement. If the table happens to have zero dumpable columns then
2281  * this will be a complete statement, otherwise it will end in
2282  * "VALUES" and be ready to have the row's column values printed.
2283  */
2284  if (insertStmt == NULL)
2285  {
2286  TableInfo *targettab;
2287 
2288  insertStmt = createPQExpBuffer();
2289 
2290  /*
2291  * When load-via-partition-root is set or forced, get the root
2292  * table name for the partition table, so that we can reload data
2293  * through the root table.
2294  */
2295  if (tbinfo->ispartition &&
2296  (dopt->load_via_partition_root ||
2297  forcePartitionRootLoad(tbinfo)))
2298  targettab = getRootTableInfo(tbinfo);
2299  else
2300  targettab = tbinfo;
2301 
2302  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2303  fmtQualifiedDumpable(targettab));
2304 
2305  /* corner case for zero-column table */
2306  if (nfields == 0)
2307  {
2308  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2309  }
2310  else
2311  {
2312  /* append the list of column names if required */
2313  if (dopt->column_inserts)
2314  {
2315  appendPQExpBufferChar(insertStmt, '(');
2316  for (int field = 0; field < nfields; field++)
2317  {
2318  if (field > 0)
2319  appendPQExpBufferStr(insertStmt, ", ");
2320  appendPQExpBufferStr(insertStmt,
2321  fmtId(PQfname(res, field)));
2322  }
2323  appendPQExpBufferStr(insertStmt, ") ");
2324  }
2325 
2326  if (tbinfo->needs_override)
2327  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2328 
2329  appendPQExpBufferStr(insertStmt, "VALUES");
2330  }
2331  }
2332 
2333  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2334  {
2335  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2336  if (rows_this_statement == 0)
2337  archputs(insertStmt->data, fout);
2338 
2339  /*
2340  * If it is zero-column table then we've already written the
2341  * complete statement, which will mean we've disobeyed
2342  * --rows-per-insert when it's set greater than 1. We do support
2343  * a way to make this multi-row with: SELECT UNION ALL SELECT
2344  * UNION ALL ... but that's non-standard so we should avoid it
2345  * given that using INSERTs is mostly only ever needed for
2346  * cross-database exports.
2347  */
2348  if (nfields == 0)
2349  continue;
2350 
2351  /* Emit a row heading */
2352  if (rows_per_statement == 1)
2353  archputs(" (", fout);
2354  else if (rows_this_statement > 0)
2355  archputs(",\n\t(", fout);
2356  else
2357  archputs("\n\t(", fout);
2358 
2359  for (int field = 0; field < nfields; field++)
2360  {
2361  if (field > 0)
2362  archputs(", ", fout);
2363  if (attgenerated[field])
2364  {
2365  archputs("DEFAULT", fout);
2366  continue;
2367  }
2368  if (PQgetisnull(res, tuple, field))
2369  {
2370  archputs("NULL", fout);
2371  continue;
2372  }
2373 
2374  /* XXX This code is partially duplicated in ruleutils.c */
2375  switch (PQftype(res, field))
2376  {
2377  case INT2OID:
2378  case INT4OID:
2379  case INT8OID:
2380  case OIDOID:
2381  case FLOAT4OID:
2382  case FLOAT8OID:
2383  case NUMERICOID:
2384  {
2385  /*
2386  * These types are printed without quotes unless
2387  * they contain values that aren't accepted by the
2388  * scanner unquoted (e.g., 'NaN'). Note that
2389  * strtod() and friends might accept NaN, so we
2390  * can't use that to test.
2391  *
2392  * In reality we only need to defend against
2393  * infinity and NaN, so we need not get too crazy
2394  * about pattern matching here.
2395  */
2396  const char *s = PQgetvalue(res, tuple, field);
2397 
2398  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2399  archputs(s, fout);
2400  else
2401  archprintf(fout, "'%s'", s);
2402  }
2403  break;
2404 
2405  case BITOID:
2406  case VARBITOID:
2407  archprintf(fout, "B'%s'",
2408  PQgetvalue(res, tuple, field));
2409  break;
2410 
2411  case BOOLOID:
2412  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2413  archputs("true", fout);
2414  else
2415  archputs("false", fout);
2416  break;
2417 
2418  default:
2419  /* All other types are printed as string literals. */
2420  resetPQExpBuffer(q);
2422  PQgetvalue(res, tuple, field),
2423  fout);
2424  archputs(q->data, fout);
2425  break;
2426  }
2427  }
2428 
2429  /* Terminate the row ... */
2430  archputs(")", fout);
2431 
2432  /* ... and the statement, if the target no. of rows is reached */
2433  if (++rows_this_statement >= rows_per_statement)
2434  {
2435  if (dopt->do_nothing)
2436  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2437  else
2438  archputs(";\n", fout);
2439  /* Reset the row counter */
2440  rows_this_statement = 0;
2441  }
2442  }
2443 
2444  if (PQntuples(res) <= 0)
2445  {
2446  PQclear(res);
2447  break;
2448  }
2449  PQclear(res);
2450  }
2451 
2452  /* Terminate any statements that didn't make the row count. */
2453  if (rows_this_statement > 0)
2454  {
2455  if (dopt->do_nothing)
2456  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2457  else
2458  archputs(";\n", fout);
2459  }
2460 
2461  archputs("\n\n", fout);
2462 
2463  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2464 
2465  destroyPQExpBuffer(q);
2466  if (insertStmt != NULL)
2467  destroyPQExpBuffer(insertStmt);
2468  free(attgenerated);
2469 
2470  return 1;
2471 }
2472 
2473 /*
2474  * getRootTableInfo:
2475  * get the root TableInfo for the given partition table.
2476  */
2477 static TableInfo *
2479 {
2480  TableInfo *parentTbinfo;
2481 
2482  Assert(tbinfo->ispartition);
2483  Assert(tbinfo->numParents == 1);
2484 
2485  parentTbinfo = tbinfo->parents[0];
2486  while (parentTbinfo->ispartition)
2487  {
2488  Assert(parentTbinfo->numParents == 1);
2489  parentTbinfo = parentTbinfo->parents[0];
2490  }
2491 
2492  return parentTbinfo;
2493 }
2494 
2495 /*
2496  * forcePartitionRootLoad
2497  * Check if we must force load_via_partition_root for this partition.
2498  *
2499  * This is required if any level of ancestral partitioned table has an
2500  * unsafe partitioning scheme.
2501  */
2502 static bool
2504 {
2505  TableInfo *parentTbinfo;
2506 
2507  Assert(tbinfo->ispartition);
2508  Assert(tbinfo->numParents == 1);
2509 
2510  parentTbinfo = tbinfo->parents[0];
2511  if (parentTbinfo->unsafe_partitions)
2512  return true;
2513  while (parentTbinfo->ispartition)
2514  {
2515  Assert(parentTbinfo->numParents == 1);
2516  parentTbinfo = parentTbinfo->parents[0];
2517  if (parentTbinfo->unsafe_partitions)
2518  return true;
2519  }
2520 
2521  return false;
2522 }
2523 
2524 /*
2525  * dumpTableData -
2526  * dump the contents of a single table
2527  *
2528  * Actually, this just makes an ArchiveEntry for the table contents.
2529  */
2530 static void
2531 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2532 {
2533  DumpOptions *dopt = fout->dopt;
2534  TableInfo *tbinfo = tdinfo->tdtable;
2535  PQExpBuffer copyBuf = createPQExpBuffer();
2536  PQExpBuffer clistBuf = createPQExpBuffer();
2537  DataDumperPtr dumpFn;
2538  char *tdDefn = NULL;
2539  char *copyStmt;
2540  const char *copyFrom;
2541 
2542  /* We had better have loaded per-column details about this table */
2543  Assert(tbinfo->interesting);
2544 
2545  /*
2546  * When load-via-partition-root is set or forced, get the root table name
2547  * for the partition table, so that we can reload data through the root
2548  * table. Then construct a comment to be inserted into the TOC entry's
2549  * defn field, so that such cases can be identified reliably.
2550  */
2551  if (tbinfo->ispartition &&
2552  (dopt->load_via_partition_root ||
2553  forcePartitionRootLoad(tbinfo)))
2554  {
2555  TableInfo *parentTbinfo;
2556 
2557  parentTbinfo = getRootTableInfo(tbinfo);
2558  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2559  printfPQExpBuffer(copyBuf, "-- load via partition root %s",
2560  copyFrom);
2561  tdDefn = pg_strdup(copyBuf->data);
2562  }
2563  else
2564  copyFrom = fmtQualifiedDumpable(tbinfo);
2565 
2566  if (dopt->dump_inserts == 0)
2567  {
2568  /* Dump/restore using COPY */
2569  dumpFn = dumpTableData_copy;
2570  /* must use 2 steps here 'cause fmtId is nonreentrant */
2571  printfPQExpBuffer(copyBuf, "COPY %s ",
2572  copyFrom);
2573  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2574  fmtCopyColumnList(tbinfo, clistBuf));
2575  copyStmt = copyBuf->data;
2576  }
2577  else
2578  {
2579  /* Restore using INSERT */
2580  dumpFn = dumpTableData_insert;
2581  copyStmt = NULL;
2582  }
2583 
2584  /*
2585  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2586  * dependency on its table as "special" and pass it to ArchiveEntry now.
2587  * See comments for BuildArchiveDependencies.
2588  */
2589  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2590  {
2591  TocEntry *te;
2592 
2593  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2594  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2595  .namespace = tbinfo->dobj.namespace->dobj.name,
2596  .owner = tbinfo->rolname,
2597  .description = "TABLE DATA",
2598  .section = SECTION_DATA,
2599  .createStmt = tdDefn,
2600  .copyStmt = copyStmt,
2601  .deps = &(tbinfo->dobj.dumpId),
2602  .nDeps = 1,
2603  .dumpFn = dumpFn,
2604  .dumpArg = tdinfo));
2605 
2606  /*
2607  * Set the TocEntry's dataLength in case we are doing a parallel dump
2608  * and want to order dump jobs by table size. We choose to measure
2609  * dataLength in table pages (including TOAST pages) during dump, so
2610  * no scaling is needed.
2611  *
2612  * However, relpages is declared as "integer" in pg_class, and hence
2613  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2614  * Cast so that we get the right interpretation of table sizes
2615  * exceeding INT_MAX pages.
2616  */
2617  te->dataLength = (BlockNumber) tbinfo->relpages;
2618  te->dataLength += (BlockNumber) tbinfo->toastpages;
2619 
2620  /*
2621  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2622  * and instead we'd better worry about integer overflow. Clamp to
2623  * INT_MAX if the correct result exceeds that.
2624  */
2625  if (sizeof(te->dataLength) == 4 &&
2626  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2627  te->dataLength < 0))
2628  te->dataLength = INT_MAX;
2629  }
2630 
2631  destroyPQExpBuffer(copyBuf);
2632  destroyPQExpBuffer(clistBuf);
2633 }
2634 
2635 /*
2636  * refreshMatViewData -
2637  * load or refresh the contents of a single materialized view
2638  *
2639  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2640  * statement.
2641  */
2642 static void
2644 {
2645  TableInfo *tbinfo = tdinfo->tdtable;
2646  PQExpBuffer q;
2647 
2648  /* If the materialized view is not flagged as populated, skip this. */
2649  if (!tbinfo->relispopulated)
2650  return;
2651 
2652  q = createPQExpBuffer();
2653 
2654  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2655  fmtQualifiedDumpable(tbinfo));
2656 
2657  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2658  ArchiveEntry(fout,
2659  tdinfo->dobj.catId, /* catalog ID */
2660  tdinfo->dobj.dumpId, /* dump ID */
2661  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2662  .namespace = tbinfo->dobj.namespace->dobj.name,
2663  .owner = tbinfo->rolname,
2664  .description = "MATERIALIZED VIEW DATA",
2665  .section = SECTION_POST_DATA,
2666  .createStmt = q->data,
2667  .deps = tdinfo->dobj.dependencies,
2668  .nDeps = tdinfo->dobj.nDeps));
2669 
2670  destroyPQExpBuffer(q);
2671 }
2672 
2673 /*
2674  * getTableData -
2675  * set up dumpable objects representing the contents of tables
2676  */
2677 static void
2678 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2679 {
2680  int i;
2681 
2682  for (i = 0; i < numTables; i++)
2683  {
2684  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2685  (!relkind || tblinfo[i].relkind == relkind))
2686  makeTableDataInfo(dopt, &(tblinfo[i]));
2687  }
2688 }
2689 
2690 /*
2691  * Make a dumpable object for the data of this specific table
2692  *
2693  * Note: we make a TableDataInfo if and only if we are going to dump the
2694  * table data; the "dump" field in such objects isn't very interesting.
2695  */
2696 static void
2698 {
2699  TableDataInfo *tdinfo;
2700 
2701  /*
2702  * Nothing to do if we already decided to dump the table. This will
2703  * happen for "config" tables.
2704  */
2705  if (tbinfo->dataObj != NULL)
2706  return;
2707 
2708  /* Skip VIEWs (no data to dump) */
2709  if (tbinfo->relkind == RELKIND_VIEW)
2710  return;
2711  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2712  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2715  tbinfo->foreign_server)))
2716  return;
2717  /* Skip partitioned tables (data in partitions) */
2718  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2719  return;
2720 
2721  /* Don't dump data in unlogged tables, if so requested */
2722  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2723  dopt->no_unlogged_table_data)
2724  return;
2725 
2726  /* Check that the data is not explicitly excluded */
2728  tbinfo->dobj.catId.oid))
2729  return;
2730 
2731  /* OK, let's dump it */
2732  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2733 
2734  if (tbinfo->relkind == RELKIND_MATVIEW)
2735  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2736  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2737  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2738  else
2739  tdinfo->dobj.objType = DO_TABLE_DATA;
2740 
2741  /*
2742  * Note: use tableoid 0 so that this object won't be mistaken for
2743  * something that pg_depend entries apply to.
2744  */
2745  tdinfo->dobj.catId.tableoid = 0;
2746  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2747  AssignDumpId(&tdinfo->dobj);
2748  tdinfo->dobj.name = tbinfo->dobj.name;
2749  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2750  tdinfo->tdtable = tbinfo;
2751  tdinfo->filtercond = NULL; /* might get set later */
2752  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2753 
2754  /* A TableDataInfo contains data, of course */
2755  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2756 
2757  tbinfo->dataObj = tdinfo;
2758 
2759  /* Make sure that we'll collect per-column info for this table. */
2760  tbinfo->interesting = true;
2761 }
2762 
2763 /*
2764  * The refresh for a materialized view must be dependent on the refresh for
2765  * any materialized view that this one is dependent on.
2766  *
2767  * This must be called after all the objects are created, but before they are
2768  * sorted.
2769  */
2770 static void
2772 {
2773  PQExpBuffer query;
2774  PGresult *res;
2775  int ntups,
2776  i;
2777  int i_classid,
2778  i_objid,
2779  i_refobjid;
2780 
2781  /* No Mat Views before 9.3. */
2782  if (fout->remoteVersion < 90300)
2783  return;
2784 
2785  query = createPQExpBuffer();
2786 
2787  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2788  "( "
2789  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2790  "FROM pg_depend d1 "
2791  "JOIN pg_class c1 ON c1.oid = d1.objid "
2792  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2793  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2794  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2795  "AND d2.objid = r1.oid "
2796  "AND d2.refobjid <> d1.objid "
2797  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2798  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2799  CppAsString2(RELKIND_VIEW) ") "
2800  "WHERE d1.classid = 'pg_class'::regclass "
2801  "UNION "
2802  "SELECT w.objid, d3.refobjid, c3.relkind "
2803  "FROM w "
2804  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2805  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2806  "AND d3.objid = r3.oid "
2807  "AND d3.refobjid <> w.refobjid "
2808  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2809  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2810  CppAsString2(RELKIND_VIEW) ") "
2811  ") "
2812  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2813  "FROM w "
2814  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2815 
2816  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2817 
2818  ntups = PQntuples(res);
2819 
2820  i_classid = PQfnumber(res, "classid");
2821  i_objid = PQfnumber(res, "objid");
2822  i_refobjid = PQfnumber(res, "refobjid");
2823 
2824  for (i = 0; i < ntups; i++)
2825  {
2826  CatalogId objId;
2827  CatalogId refobjId;
2828  DumpableObject *dobj;
2829  DumpableObject *refdobj;
2830  TableInfo *tbinfo;
2831  TableInfo *reftbinfo;
2832 
2833  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2834  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2835  refobjId.tableoid = objId.tableoid;
2836  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2837 
2838  dobj = findObjectByCatalogId(objId);
2839  if (dobj == NULL)
2840  continue;
2841 
2842  Assert(dobj->objType == DO_TABLE);
2843  tbinfo = (TableInfo *) dobj;
2844  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2845  dobj = (DumpableObject *) tbinfo->dataObj;
2846  if (dobj == NULL)
2847  continue;
2848  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2849 
2850  refdobj = findObjectByCatalogId(refobjId);
2851  if (refdobj == NULL)
2852  continue;
2853 
2854  Assert(refdobj->objType == DO_TABLE);
2855  reftbinfo = (TableInfo *) refdobj;
2856  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2857  refdobj = (DumpableObject *) reftbinfo->dataObj;
2858  if (refdobj == NULL)
2859  continue;
2860  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2861 
2862  addObjectDependency(dobj, refdobj->dumpId);
2863 
2864  if (!reftbinfo->relispopulated)
2865  tbinfo->relispopulated = false;
2866  }
2867 
2868  PQclear(res);
2869 
2870  destroyPQExpBuffer(query);
2871 }
2872 
2873 /*
2874  * getTableDataFKConstraints -
2875  * add dump-order dependencies reflecting foreign key constraints
2876  *
2877  * This code is executed only in a data-only dump --- in schema+data dumps
2878  * we handle foreign key issues by not creating the FK constraints until
2879  * after the data is loaded. In a data-only dump, however, we want to
2880  * order the table data objects in such a way that a table's referenced
2881  * tables are restored first. (In the presence of circular references or
2882  * self-references this may be impossible; we'll detect and complain about
2883  * that during the dependency sorting step.)
2884  */
2885 static void
2887 {
2888  DumpableObject **dobjs;
2889  int numObjs;
2890  int i;
2891 
2892  /* Search through all the dumpable objects for FK constraints */
2893  getDumpableObjects(&dobjs, &numObjs);
2894  for (i = 0; i < numObjs; i++)
2895  {
2896  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2897  {
2898  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2899  TableInfo *ftable;
2900 
2901  /* Not interesting unless both tables are to be dumped */
2902  if (cinfo->contable == NULL ||
2903  cinfo->contable->dataObj == NULL)
2904  continue;
2905  ftable = findTableByOid(cinfo->confrelid);
2906  if (ftable == NULL ||
2907  ftable->dataObj == NULL)
2908  continue;
2909 
2910  /*
2911  * Okay, make referencing table's TABLE_DATA object depend on the
2912  * referenced table's TABLE_DATA object.
2913  */
2915  ftable->dataObj->dobj.dumpId);
2916  }
2917  }
2918  free(dobjs);
2919 }
2920 
2921 
2922 /*
2923  * dumpDatabase:
2924  * dump the database definition
2925  */
2926 static void
2928 {
2929  DumpOptions *dopt = fout->dopt;
2930  PQExpBuffer dbQry = createPQExpBuffer();
2931  PQExpBuffer delQry = createPQExpBuffer();
2932  PQExpBuffer creaQry = createPQExpBuffer();
2933  PQExpBuffer labelq = createPQExpBuffer();
2934  PGconn *conn = GetConnection(fout);
2935  PGresult *res;
2936  int i_tableoid,
2937  i_oid,
2938  i_datname,
2939  i_datdba,
2940  i_encoding,
2941  i_datlocprovider,
2942  i_collate,
2943  i_ctype,
2944  i_daticulocale,
2945  i_daticurules,
2946  i_frozenxid,
2947  i_minmxid,
2948  i_datacl,
2949  i_acldefault,
2950  i_datistemplate,
2951  i_datconnlimit,
2952  i_datcollversion,
2953  i_tablespace;
2954  CatalogId dbCatId;
2955  DumpId dbDumpId;
2956  DumpableAcl dbdacl;
2957  const char *datname,
2958  *dba,
2959  *encoding,
2960  *datlocprovider,
2961  *collate,
2962  *ctype,
2963  *iculocale,
2964  *icurules,
2965  *datistemplate,
2966  *datconnlimit,
2967  *tablespace;
2968  uint32 frozenxid,
2969  minmxid;
2970  char *qdatname;
2971 
2972  pg_log_info("saving database definition");
2973 
2974  /*
2975  * Fetch the database-level properties for this database.
2976  */
2977  appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
2978  "datdba, "
2979  "pg_encoding_to_char(encoding) AS encoding, "
2980  "datcollate, datctype, datfrozenxid, "
2981  "datacl, acldefault('d', datdba) AS acldefault, "
2982  "datistemplate, datconnlimit, ");
2983  if (fout->remoteVersion >= 90300)
2984  appendPQExpBufferStr(dbQry, "datminmxid, ");
2985  else
2986  appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
2987  if (fout->remoteVersion >= 150000)
2988  appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale, datcollversion, ");
2989  else
2990  appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS daticulocale, NULL AS datcollversion, ");
2991  if (fout->remoteVersion >= 160000)
2992  appendPQExpBufferStr(dbQry, "daticurules, ");
2993  else
2994  appendPQExpBufferStr(dbQry, "NULL AS daticurules, ");
2995  appendPQExpBufferStr(dbQry,
2996  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2997  "shobj_description(oid, 'pg_database') AS description "
2998  "FROM pg_database "
2999  "WHERE datname = current_database()");
3000 
3001  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
3002 
3003  i_tableoid = PQfnumber(res, "tableoid");
3004  i_oid = PQfnumber(res, "oid");
3005  i_datname = PQfnumber(res, "datname");
3006  i_datdba = PQfnumber(res, "datdba");
3007  i_encoding = PQfnumber(res, "encoding");
3008  i_datlocprovider = PQfnumber(res, "datlocprovider");
3009  i_collate = PQfnumber(res, "datcollate");
3010  i_ctype = PQfnumber(res, "datctype");
3011  i_daticulocale = PQfnumber(res, "daticulocale");
3012  i_daticurules = PQfnumber(res, "daticurules");
3013  i_frozenxid = PQfnumber(res, "datfrozenxid");
3014  i_minmxid = PQfnumber(res, "datminmxid");
3015  i_datacl = PQfnumber(res, "datacl");
3016  i_acldefault = PQfnumber(res, "acldefault");
3017  i_datistemplate = PQfnumber(res, "datistemplate");
3018  i_datconnlimit = PQfnumber(res, "datconnlimit");
3019  i_datcollversion = PQfnumber(res, "datcollversion");
3020  i_tablespace = PQfnumber(res, "tablespace");
3021 
3022  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
3023  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3024  datname = PQgetvalue(res, 0, i_datname);
3025  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
3026  encoding = PQgetvalue(res, 0, i_encoding);
3027  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
3028  collate = PQgetvalue(res, 0, i_collate);
3029  ctype = PQgetvalue(res, 0, i_ctype);
3030  if (!PQgetisnull(res, 0, i_daticulocale))
3031  iculocale = PQgetvalue(res, 0, i_daticulocale);
3032  else
3033  iculocale = NULL;
3034  if (!PQgetisnull(res, 0, i_daticurules))
3035  icurules = PQgetvalue(res, 0, i_daticurules);
3036  else
3037  icurules = NULL;
3038  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3039  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3040  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3041  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3042  datistemplate = PQgetvalue(res, 0, i_datistemplate);
3043  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3044  tablespace = PQgetvalue(res, 0, i_tablespace);
3045 
3046  qdatname = pg_strdup(fmtId(datname));
3047 
3048  /*
3049  * Prepare the CREATE DATABASE command. We must specify OID (if we want
3050  * to preserve that), as well as the encoding, locale, and tablespace
3051  * since those can't be altered later. Other DB properties are left to
3052  * the DATABASE PROPERTIES entry, so that they can be applied after
3053  * reconnecting to the target DB.
3054  */
3055  if (dopt->binary_upgrade)
3056  {
3057  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0 OID = %u",
3058  qdatname, dbCatId.oid);
3059  }
3060  else
3061  {
3062  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3063  qdatname);
3064  }
3065  if (strlen(encoding) > 0)
3066  {
3067  appendPQExpBufferStr(creaQry, " ENCODING = ");
3068  appendStringLiteralAH(creaQry, encoding, fout);
3069  }
3070 
3071  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
3072  if (datlocprovider[0] == 'c')
3073  appendPQExpBufferStr(creaQry, "libc");
3074  else if (datlocprovider[0] == 'i')
3075  appendPQExpBufferStr(creaQry, "icu");
3076  else
3077  pg_fatal("unrecognized locale provider: %s",
3078  datlocprovider);
3079 
3080  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3081  {
3082  appendPQExpBufferStr(creaQry, " LOCALE = ");
3083  appendStringLiteralAH(creaQry, collate, fout);
3084  }
3085  else
3086  {
3087  if (strlen(collate) > 0)
3088  {
3089  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3090  appendStringLiteralAH(creaQry, collate, fout);
3091  }
3092  if (strlen(ctype) > 0)
3093  {
3094  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3095  appendStringLiteralAH(creaQry, ctype, fout);
3096  }
3097  }
3098  if (iculocale)
3099  {
3100  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
3101  appendStringLiteralAH(creaQry, iculocale, fout);
3102  }
3103  if (icurules)
3104  {
3105  appendPQExpBufferStr(creaQry, " ICU_RULES = ");
3106  appendStringLiteralAH(creaQry, icurules, fout);
3107  }
3108 
3109  /*
3110  * For binary upgrade, carry over the collation version. For normal
3111  * dump/restore, omit the version, so that it is computed upon restore.
3112  */
3113  if (dopt->binary_upgrade)
3114  {
3115  if (!PQgetisnull(res, 0, i_datcollversion))
3116  {
3117  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3118  appendStringLiteralAH(creaQry,
3119  PQgetvalue(res, 0, i_datcollversion),
3120  fout);
3121  }
3122  }
3123 
3124  /*
3125  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3126  * thing; the decision whether to specify a tablespace should be left till
3127  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3128  * label the DATABASE entry with the tablespace and let the normal
3129  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3130  * attention to default_tablespace, so that won't work.
3131  */
3132  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3133  !dopt->outputNoTablespaces)
3134  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3135  fmtId(tablespace));
3136  appendPQExpBufferStr(creaQry, ";\n");
3137 
3138  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3139  qdatname);
3140 
3141  dbDumpId = createDumpId();
3142 
3143  ArchiveEntry(fout,
3144  dbCatId, /* catalog ID */
3145  dbDumpId, /* dump ID */
3146  ARCHIVE_OPTS(.tag = datname,
3147  .owner = dba,
3148  .description = "DATABASE",
3149  .section = SECTION_PRE_DATA,
3150  .createStmt = creaQry->data,
3151  .dropStmt = delQry->data));
3152 
3153  /* Compute correct tag for archive entry */
3154  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3155 
3156  /* Dump DB comment if any */
3157  {
3158  /*
3159  * 8.2 and up keep comments on shared objects in a shared table, so we
3160  * cannot use the dumpComment() code used for other database objects.
3161  * Be careful that the ArchiveEntry parameters match that function.
3162  */
3163  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3164 
3165  if (comment && *comment && !dopt->no_comments)
3166  {
3167  resetPQExpBuffer(dbQry);
3168 
3169  /*
3170  * Generates warning when loaded into a differently-named
3171  * database.
3172  */
3173  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3174  appendStringLiteralAH(dbQry, comment, fout);
3175  appendPQExpBufferStr(dbQry, ";\n");
3176 
3178  ARCHIVE_OPTS(.tag = labelq->data,
3179  .owner = dba,
3180  .description = "COMMENT",
3181  .section = SECTION_NONE,
3182  .createStmt = dbQry->data,
3183  .deps = &dbDumpId,
3184  .nDeps = 1));
3185  }
3186  }
3187 
3188  /* Dump DB security label, if enabled */
3189  if (!dopt->no_security_labels)
3190  {
3191  PGresult *shres;
3192  PQExpBuffer seclabelQry;
3193 
3194  seclabelQry = createPQExpBuffer();
3195 
3196  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3197  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3198  resetPQExpBuffer(seclabelQry);
3199  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3200  if (seclabelQry->len > 0)
3202  ARCHIVE_OPTS(.tag = labelq->data,
3203  .owner = dba,
3204  .description = "SECURITY LABEL",
3205  .section = SECTION_NONE,
3206  .createStmt = seclabelQry->data,
3207  .deps = &dbDumpId,
3208  .nDeps = 1));
3209  destroyPQExpBuffer(seclabelQry);
3210  PQclear(shres);
3211  }
3212 
3213  /*
3214  * Dump ACL if any. Note that we do not support initial privileges
3215  * (pg_init_privs) on databases.
3216  */
3217  dbdacl.privtype = 0;
3218  dbdacl.initprivs = NULL;
3219 
3220  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3221  qdatname, NULL, NULL,
3222  dba, &dbdacl);
3223 
3224  /*
3225  * Now construct a DATABASE PROPERTIES archive entry to restore any
3226  * non-default database-level properties. (The reason this must be
3227  * separate is that we cannot put any additional commands into the TOC
3228  * entry that has CREATE DATABASE. pg_restore would execute such a group
3229  * in an implicit transaction block, and the backend won't allow CREATE
3230  * DATABASE in that context.)
3231  */
3232  resetPQExpBuffer(creaQry);
3233  resetPQExpBuffer(delQry);
3234 
3235  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3236  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3237  qdatname, datconnlimit);
3238 
3239  if (strcmp(datistemplate, "t") == 0)
3240  {
3241  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3242  qdatname);
3243 
3244  /*
3245  * The backend won't accept DROP DATABASE on a template database. We
3246  * can deal with that by removing the template marking before the DROP
3247  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3248  * since no such command is currently supported, fake it with a direct
3249  * UPDATE on pg_database.
3250  */
3251  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3252  "SET datistemplate = false WHERE datname = ");
3253  appendStringLiteralAH(delQry, datname, fout);
3254  appendPQExpBufferStr(delQry, ";\n");
3255  }
3256 
3257  /* Add database-specific SET options */
3258  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3259 
3260  /*
3261  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3262  * entry, too, for lack of a better place.
3263  */
3264  if (dopt->binary_upgrade)
3265  {
3266  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3267  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3268  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3269  "WHERE datname = ",
3270  frozenxid, minmxid);
3271  appendStringLiteralAH(creaQry, datname, fout);
3272  appendPQExpBufferStr(creaQry, ";\n");
3273  }
3274 
3275  if (creaQry->len > 0)
3277  ARCHIVE_OPTS(.tag = datname,
3278  .owner = dba,
3279  .description = "DATABASE PROPERTIES",
3280  .section = SECTION_PRE_DATA,
3281  .createStmt = creaQry->data,
3282  .dropStmt = delQry->data,
3283  .deps = &dbDumpId));
3284 
3285  /*
3286  * pg_largeobject comes from the old system intact, so set its
3287  * relfrozenxids, relminmxids and relfilenode.
3288  */
3289  if (dopt->binary_upgrade)
3290  {
3291  PGresult *lo_res;
3292  PQExpBuffer loFrozenQry = createPQExpBuffer();
3293  PQExpBuffer loOutQry = createPQExpBuffer();
3294  PQExpBuffer loHorizonQry = createPQExpBuffer();
3295  int ii_relfrozenxid,
3296  ii_relfilenode,
3297  ii_oid,
3298  ii_relminmxid;
3299 
3300  /*
3301  * pg_largeobject
3302  */
3303  if (fout->remoteVersion >= 90300)
3304  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3305  "FROM pg_catalog.pg_class\n"
3306  "WHERE oid IN (%u, %u);\n",
3307  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3308  else
3309  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3310  "FROM pg_catalog.pg_class\n"
3311  "WHERE oid IN (%u, %u);\n",
3312  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3313 
3314  lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3315 
3316  ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3317  ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3318  ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3319  ii_oid = PQfnumber(lo_res, "oid");
3320 
3321  appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3322  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3323  for (int i = 0; i < PQntuples(lo_res); ++i)
3324  {
3325  Oid oid;
3326  RelFileNumber relfilenumber;
3327 
3328  appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3329  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3330  "WHERE oid = %u;\n",
3331  atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3332  atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3333  atooid(PQgetvalue(lo_res, i, ii_oid)));
3334 
3335  oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3336  relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3337 
3338  if (oid == LargeObjectRelationId)
3339  appendPQExpBuffer(loOutQry,
3340  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3341  relfilenumber);
3342  else if (oid == LargeObjectLOidPNIndexId)
3343  appendPQExpBuffer(loOutQry,
3344  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3345  relfilenumber);
3346  }
3347 
3348  appendPQExpBufferStr(loOutQry,
3349  "TRUNCATE pg_catalog.pg_largeobject;\n");
3350  appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3351 
3353  ARCHIVE_OPTS(.tag = "pg_largeobject",
3354  .description = "pg_largeobject",
3355  .section = SECTION_PRE_DATA,
3356  .createStmt = loOutQry->data));
3357 
3358  PQclear(lo_res);
3359 
3360  destroyPQExpBuffer(loFrozenQry);
3361  destroyPQExpBuffer(loHorizonQry);
3362  destroyPQExpBuffer(loOutQry);
3363  }
3364 
3365  PQclear(res);
3366 
3367  free(qdatname);
3368  destroyPQExpBuffer(dbQry);
3369  destroyPQExpBuffer(delQry);
3370  destroyPQExpBuffer(creaQry);
3371  destroyPQExpBuffer(labelq);
3372 }
3373 
3374 /*
3375  * Collect any database-specific or role-and-database-specific SET options
3376  * for this database, and append them to outbuf.
3377  */
3378 static void
3380  const char *dbname, Oid dboid)
3381 {
3382  PGconn *conn = GetConnection(AH);
3384  PGresult *res;
3385 
3386  /* First collect database-specific options */
3387  printfPQExpBuffer(buf, "SELECT unnest(setconfig)");
3388  if (AH->remoteVersion >= 160000)
3389  appendPQExpBufferStr(buf, ", unnest(setuser)");
3390  appendPQExpBuffer(buf, " FROM pg_db_role_setting "
3391  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3392  dboid);
3393 
3394  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3395 
3396  for (int i = 0; i < PQntuples(res); i++)
3397  {
3398  char *userset = NULL;
3399 
3400  if (AH->remoteVersion >= 160000)
3401  userset = PQgetvalue(res, i, 1);
3402  makeAlterConfigCommand(conn, PQgetvalue(res, i, 0), userset,
3403  "DATABASE", dbname, NULL, NULL,
3404  outbuf);
3405  }
3406 
3407  PQclear(res);
3408 
3409  /* Now look for role-and-database-specific options */
3410  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig)");
3411  if (AH->remoteVersion >= 160000)
3412  appendPQExpBufferStr(buf, ", unnest(setuser)");
3413  appendPQExpBuffer(buf, " FROM pg_db_role_setting s, pg_roles r "
3414  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3415  dboid);
3416 
3417  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3418 
3419  for (int i = 0; i < PQntuples(res); i++)
3420  {
3421  char *userset = NULL;
3422 
3423  if (AH->remoteVersion >= 160000)
3424  userset = PQgetvalue(res, i, 2);
3425  makeAlterConfigCommand(conn, PQgetvalue(res, i, 1), userset,
3426  "ROLE", PQgetvalue(res, i, 0),
3427  "DATABASE", dbname,
3428  outbuf);
3429  }
3430 
3431  PQclear(res);
3432 
3434 }
3435 
3436 /*
3437  * dumpEncoding: put the correct encoding into the archive
3438  */
3439 static void
3441 {
3442  const char *encname = pg_encoding_to_char(AH->encoding);
3444 
3445  pg_log_info("saving encoding = %s", encname);
3446 
3447  appendPQExpBufferStr(qry, "SET client_encoding = ");
3448  appendStringLiteralAH(qry, encname, AH);
3449  appendPQExpBufferStr(qry, ";\n");
3450 
3452  ARCHIVE_OPTS(.tag = "ENCODING",
3453  .description = "ENCODING",
3454  .section = SECTION_PRE_DATA,
3455  .createStmt = qry->data));
3456 
3457  destroyPQExpBuffer(qry);
3458 }
3459 
3460 
3461 /*
3462  * dumpStdStrings: put the correct escape string behavior into the archive
3463  */
3464 static void
3466 {
3467  const char *stdstrings = AH->std_strings ? "on" : "off";
3469 
3470  pg_log_info("saving standard_conforming_strings = %s",
3471  stdstrings);
3472 
3473  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3474  stdstrings);
3475 
3477  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3478  .description = "STDSTRINGS",
3479  .section = SECTION_PRE_DATA,
3480  .createStmt = qry->data));
3481 
3482  destroyPQExpBuffer(qry);
3483 }
3484 
3485 /*
3486  * dumpSearchPath: record the active search_path in the archive
3487  */
3488 static void
3490 {
3492  PQExpBuffer path = createPQExpBuffer();
3493  PGresult *res;
3494  char **schemanames = NULL;
3495  int nschemanames = 0;
3496  int i;
3497 
3498  /*
3499  * We use the result of current_schemas(), not the search_path GUC,
3500  * because that might contain wildcards such as "$user", which won't
3501  * necessarily have the same value during restore. Also, this way avoids
3502  * listing schemas that may appear in search_path but not actually exist,
3503  * which seems like a prudent exclusion.
3504  */
3506  "SELECT pg_catalog.current_schemas(false)");
3507 
3508  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3509  pg_fatal("could not parse result of current_schemas()");
3510 
3511  /*
3512  * We use set_config(), not a simple "SET search_path" command, because
3513  * the latter has less-clean behavior if the search path is empty. While
3514  * that's likely to get fixed at some point, it seems like a good idea to
3515  * be as backwards-compatible as possible in what we put into archives.
3516  */
3517  for (i = 0; i < nschemanames; i++)
3518  {
3519  if (i > 0)
3520  appendPQExpBufferStr(path, ", ");
3521  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3522  }
3523 
3524  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3525  appendStringLiteralAH(qry, path->data, AH);
3526  appendPQExpBufferStr(qry, ", false);\n");
3527 
3528  pg_log_info("saving search_path = %s", path->data);
3529 
3531  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3532  .description = "SEARCHPATH",
3533  .section = SECTION_PRE_DATA,
3534  .createStmt = qry->data));
3535 
3536  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3537  AH->searchpath = pg_strdup(qry->data);
3538 
3539  free(schemanames);
3540  PQclear(res);
3541  destroyPQExpBuffer(qry);
3542  destroyPQExpBuffer(path);
3543 }
3544 
3545 
3546 /*
3547  * getLOs:
3548  * Collect schema-level data about large objects
3549  */
3550 static void
3552 {
3553  DumpOptions *dopt = fout->dopt;
3554  PQExpBuffer loQry = createPQExpBuffer();
3555  LoInfo *loinfo;
3556  DumpableObject *lodata;
3557  PGresult *res;
3558  int ntups;
3559  int i;
3560  int i_oid;
3561  int i_lomowner;
3562  int i_lomacl;
3563  int i_acldefault;
3564 
3565  pg_log_info("reading large objects");
3566 
3567  /* Fetch LO OIDs, and owner/ACL data */
3568  appendPQExpBufferStr(loQry,
3569  "SELECT oid, lomowner, lomacl, "
3570  "acldefault('L', lomowner) AS acldefault "
3571  "FROM pg_largeobject_metadata");
3572 
3573  res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3574 
3575  i_oid = PQfnumber(res, "oid");
3576  i_lomowner = PQfnumber(res, "lomowner");
3577  i_lomacl = PQfnumber(res, "lomacl");
3578  i_acldefault = PQfnumber(res, "acldefault");
3579 
3580  ntups = PQntuples(res);
3581 
3582  /*
3583  * Each large object has its own "BLOB" archive entry.
3584  */
3585  loinfo = (LoInfo *) pg_malloc(ntups * sizeof(LoInfo));
3586 
3587  for (i = 0; i < ntups; i++)
3588  {
3589  loinfo[i].dobj.objType = DO_LARGE_OBJECT;
3590  loinfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3591  loinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3592  AssignDumpId(&loinfo[i].dobj);
3593 
3594  loinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3595  loinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3596  loinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3597  loinfo[i].dacl.privtype = 0;
3598  loinfo[i].dacl.initprivs = NULL;
3599  loinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
3600 
3601  /* LOs have data */
3602  loinfo[i].dobj.components |= DUMP_COMPONENT_DATA;
3603 
3604  /* Mark whether LO has an ACL */
3605  if (!PQgetisnull(res, i, i_lomacl))
3606  loinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
3607 
3608  /*
3609  * In binary-upgrade mode for LOs, we do *not* dump out the LO
3610  * data, as it will be copied by pg_upgrade, which simply copies the
3611  * pg_largeobject table. We *do* however dump out anything but the
3612  * data, as pg_upgrade copies just pg_largeobject, but not
3613  * pg_largeobject_metadata, after the dump is restored.
3614  */
3615  if (dopt->binary_upgrade)
3616  loinfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3617  }
3618 
3619  /*
3620  * If we have any large objects, a "BLOBS" archive entry is needed. This
3621  * is just a placeholder for sorting; it carries no data now.
3622  */
3623  if (ntups > 0)
3624  {
3625  lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3626  lodata->objType = DO_LARGE_OBJECT_DATA;
3627  lodata->catId = nilCatalogId;
3628  AssignDumpId(lodata);
3629  lodata->name = pg_strdup("BLOBS");
3630  lodata->components |= DUMP_COMPONENT_DATA;
3631  }
3632 
3633  PQclear(res);
3634  destroyPQExpBuffer(loQry);
3635 }
3636 
3637 /*
3638  * dumpLO
3639  *
3640  * dump the definition (metadata) of the given large object
3641  */
3642 static void
3643 dumpLO(Archive *fout, const LoInfo *loinfo)
3644 {
3645  PQExpBuffer cquery = createPQExpBuffer();
3646  PQExpBuffer dquery = createPQExpBuffer();
3647 
3648  appendPQExpBuffer(cquery,
3649  "SELECT pg_catalog.lo_create('%s');\n",
3650  loinfo->dobj.name);
3651 
3652  appendPQExpBuffer(dquery,
3653  "SELECT pg_catalog.lo_unlink('%s');\n",
3654  loinfo->dobj.name);
3655 
3656  if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3657  ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3658  ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3659  .owner = loinfo->rolname,
3660  .description = "BLOB",
3661  .section = SECTION_PRE_DATA,
3662  .createStmt = cquery->data,
3663  .dropStmt = dquery->data));
3664 
3665  /* Dump comment if any */
3666  if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3667  dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
3668  NULL, loinfo->rolname,
3669  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3670 
3671  /* Dump security label if any */
3672  if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3673  dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
3674  NULL, loinfo->rolname,
3675  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3676 
3677  /* Dump ACL if any */
3678  if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3679  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3680  loinfo->dobj.name, NULL,
3681  NULL, loinfo->rolname, &loinfo->dacl);
3682 
3683  destroyPQExpBuffer(cquery);
3684  destroyPQExpBuffer(dquery);
3685 }
3686 
3687 /*
3688  * dumpLOs:
3689  * dump the data contents of all large objects
3690  */
3691 static int
3692 dumpLOs(Archive *fout, const void *arg)
3693 {
3694  const char *loQry;
3695  const char *loFetchQry;
3696  PGconn *conn = GetConnection(fout);
3697  PGresult *res;
3698  char buf[LOBBUFSIZE];
3699  int ntups;
3700  int i;
3701  int cnt;
3702 
3703  pg_log_info("saving large objects");
3704 
3705  /*
3706  * Currently, we re-fetch all LO OIDs using a cursor. Consider scanning
3707  * the already-in-memory dumpable objects instead...
3708  */
3709  loQry =
3710  "DECLARE looid CURSOR FOR "
3711  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3712 
3713  ExecuteSqlStatement(fout, loQry);
3714 
3715  /* Command to fetch from cursor */
3716  loFetchQry = "FETCH 1000 IN looid";
3717 
3718  do
3719  {
3720  /* Do a fetch */
3721  res = ExecuteSqlQuery(fout, loFetchQry, PGRES_TUPLES_OK);
3722 
3723  /* Process the tuples, if any */
3724  ntups = PQntuples(res);
3725  for (i = 0; i < ntups; i++)
3726  {
3727  Oid loOid;
3728  int loFd;
3729 
3730  loOid = atooid(PQgetvalue(res, i, 0));
3731  /* Open the LO */
3732  loFd = lo_open(conn, loOid, INV_READ);
3733  if (loFd == -1)
3734  pg_fatal("could not open large object %u: %s",
3735  loOid, PQerrorMessage(conn));
3736 
3737  StartLO(fout, loOid);
3738 
3739  /* Now read it in chunks, sending data to archive */
3740  do
3741  {
3742  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3743  if (cnt < 0)
3744  pg_fatal("error reading large object %u: %s",
3745  loOid, PQerrorMessage(conn));
3746 
3747  WriteData(fout, buf, cnt);
3748  } while (cnt > 0);
3749 
3750  lo_close(conn, loFd);
3751 
3752  EndLO(fout, loOid);
3753  }
3754 
3755  PQclear(res);
3756  } while (ntups > 0);
3757 
3758  return 1;
3759 }
3760 
3761 /*
3762  * getPolicies
3763  * get information about all RLS policies on dumpable tables.
3764  */
3765 void
3766 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3767 {
3768  PQExpBuffer query;
3769  PQExpBuffer tbloids;
3770  PGresult *res;
3771  PolicyInfo *polinfo;
3772  int i_oid;
3773  int i_tableoid;
3774  int i_polrelid;
3775  int i_polname;
3776  int i_polcmd;
3777  int i_polpermissive;
3778  int i_polroles;
3779  int i_polqual;
3780  int i_polwithcheck;
3781  int i,
3782  j,
3783  ntups;
3784 
3785  /* No policies before 9.5 */
3786  if (fout->remoteVersion < 90500)
3787  return;
3788 
3789  query = createPQExpBuffer();
3790  tbloids = createPQExpBuffer();
3791 
3792  /*
3793  * Identify tables of interest, and check which ones have RLS enabled.
3794  */
3795  appendPQExpBufferChar(tbloids, '{');
3796  for (i = 0; i < numTables; i++)
3797  {
3798  TableInfo *tbinfo = &tblinfo[i];
3799 
3800  /* Ignore row security on tables not to be dumped */
3801  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3802  continue;
3803 
3804  /* It can't have RLS or policies if it's not a table */
3805  if (tbinfo->relkind != RELKIND_RELATION &&
3806  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
3807  continue;
3808 
3809  /* Add it to the list of table OIDs to be probed below */
3810  if (tbloids->len > 1) /* do we have more than the '{'? */
3811  appendPQExpBufferChar(tbloids, ',');
3812  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
3813 
3814  /* Is RLS enabled? (That's separate from whether it has policies) */
3815  if (tbinfo->rowsec)
3816  {
3818 
3819  /*
3820  * We represent RLS being enabled on a table by creating a
3821  * PolicyInfo object with null polname.
3822  *
3823  * Note: use tableoid 0 so that this object won't be mistaken for
3824  * something that pg_depend entries apply to.
3825  */
3826  polinfo = pg_malloc(sizeof(PolicyInfo));
3827  polinfo->dobj.objType = DO_POLICY;
3828  polinfo->dobj.catId.tableoid = 0;
3829  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3830  AssignDumpId(&polinfo->dobj);
3831  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3832  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3833  polinfo->poltable = tbinfo;
3834  polinfo->polname = NULL;
3835  polinfo->polcmd = '\0';
3836  polinfo->polpermissive = 0;
3837  polinfo->polroles = NULL;
3838  polinfo->polqual = NULL;
3839  polinfo->polwithcheck = NULL;
3840  }
3841  }
3842  appendPQExpBufferChar(tbloids, '}');
3843 
3844  /*
3845  * Now, read all RLS policies belonging to the tables of interest, and
3846  * create PolicyInfo objects for them. (Note that we must filter the
3847  * results server-side not locally, because we dare not apply pg_get_expr
3848  * to tables we don't have lock on.)
3849  */
3850  pg_log_info("reading row-level security policies");
3851 
3852  printfPQExpBuffer(query,
3853  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3854  if (fout->remoteVersion >= 100000)
3855  appendPQExpBufferStr(query, "pol.polpermissive, ");
3856  else
3857  appendPQExpBufferStr(query, "'t' as polpermissive, ");
3858  appendPQExpBuffer(query,
3859  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3860  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3861  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3862  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3863  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
3864  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
3865  tbloids->data);
3866 
3867  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3868 
3869  ntups = PQntuples(res);
3870  if (ntups > 0)
3871  {
3872  i_oid = PQfnumber(res, "oid");
3873  i_tableoid = PQfnumber(res, "tableoid");
3874  i_polrelid = PQfnumber(res, "polrelid");
3875  i_polname = PQfnumber(res, "polname");
3876  i_polcmd = PQfnumber(res, "polcmd");
3877  i_polpermissive = PQfnumber(res, "polpermissive");
3878  i_polroles = PQfnumber(res, "polroles");
3879  i_polqual = PQfnumber(res, "polqual");
3880  i_polwithcheck = PQfnumber(res, "polwithcheck");
3881 
3882  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3883 
3884  for (j = 0; j < ntups; j++)
3885  {
3886  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3887  TableInfo *tbinfo = findTableByOid(polrelid);
3888 
3890 
3891  polinfo[j].dobj.objType = DO_POLICY;
3892  polinfo[j].dobj.catId.tableoid =
3893  atooid(PQgetvalue(res, j, i_tableoid));
3894  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3895  AssignDumpId(&polinfo[j].dobj);
3896  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3897  polinfo[j].poltable = tbinfo;
3898  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3899  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3900 
3901  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3902  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3903 
3904  if (PQgetisnull(res, j, i_polroles))
3905  polinfo[j].polroles = NULL;
3906  else
3907  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3908 
3909  if (PQgetisnull(res, j, i_polqual))
3910  polinfo[j].polqual = NULL;
3911  else
3912  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3913 
3914  if (PQgetisnull(res, j, i_polwithcheck))
3915  polinfo[j].polwithcheck = NULL;
3916  else
3917  polinfo[j].polwithcheck
3918  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3919  }
3920  }
3921 
3922  PQclear(res);
3923 
3924  destroyPQExpBuffer(query);
3925  destroyPQExpBuffer(tbloids);
3926 }
3927 
3928 /*
3929  * dumpPolicy
3930  * dump the definition of the given policy
3931  */
3932 static void
3933 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3934 {
3935  DumpOptions *dopt = fout->dopt;
3936  TableInfo *tbinfo = polinfo->poltable;
3937  PQExpBuffer query;
3938  PQExpBuffer delqry;
3939  PQExpBuffer polprefix;
3940  char *qtabname;
3941  const char *cmd;
3942  char *tag;
3943 
3944  /* Do nothing in data-only dump */
3945  if (dopt->dataOnly)
3946  return;
3947 
3948  /*
3949  * If polname is NULL, then this record is just indicating that ROW LEVEL
3950  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3951  * ROW LEVEL SECURITY.
3952  */
3953  if (polinfo->polname == NULL)
3954  {
3955  query = createPQExpBuffer();
3956 
3957  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3958  fmtQualifiedDumpable(tbinfo));
3959 
3960  /*
3961  * We must emit the ROW SECURITY object's dependency on its table
3962  * explicitly, because it will not match anything in pg_depend (unlike
3963  * the case for other PolicyInfo objects).
3964  */
3965  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3966  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3967  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3968  .namespace = polinfo->dobj.namespace->dobj.name,
3969  .owner = tbinfo->rolname,
3970  .description = "ROW SECURITY",
3971  .section = SECTION_POST_DATA,
3972  .createStmt = query->data,
3973  .deps = &(tbinfo->dobj.dumpId),
3974  .nDeps = 1));
3975 
3976  destroyPQExpBuffer(query);
3977  return;
3978  }
3979 
3980  if (polinfo->polcmd == '*')
3981  cmd = "";
3982  else if (polinfo->polcmd == 'r')
3983  cmd = " FOR SELECT";
3984  else if (polinfo->polcmd == 'a')
3985  cmd = " FOR INSERT";
3986  else if (polinfo->polcmd == 'w')
3987  cmd = " FOR UPDATE";
3988  else if (polinfo->polcmd == 'd')
3989  cmd = " FOR DELETE";
3990  else
3991  pg_fatal("unexpected policy command type: %c",
3992  polinfo->polcmd);
3993 
3994  query = createPQExpBuffer();
3995  delqry = createPQExpBuffer();
3996  polprefix = createPQExpBuffer();
3997 
3998  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3999 
4000  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
4001 
4002  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
4003  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
4004 
4005  if (polinfo->polroles != NULL)
4006  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
4007 
4008  if (polinfo->polqual != NULL)
4009  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
4010 
4011  if (polinfo->polwithcheck != NULL)
4012  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
4013 
4014  appendPQExpBufferStr(query, ";\n");
4015 
4016  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
4017  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
4018 
4019  appendPQExpBuffer(polprefix, "POLICY %s ON",
4020  fmtId(polinfo->polname));
4021 
4022  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
4023 
4024  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4025  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4026  ARCHIVE_OPTS(.tag = tag,
4027  .namespace = polinfo->dobj.namespace->dobj.name,
4028  .owner = tbinfo->rolname,
4029  .description = "POLICY",
4030  .section = SECTION_POST_DATA,
4031  .createStmt = query->data,
4032  .dropStmt = delqry->data));
4033 
4034  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4035  dumpComment(fout, polprefix->data, qtabname,
4036  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
4037  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
4038 
4039  free(tag);
4040  destroyPQExpBuffer(query);
4041  destroyPQExpBuffer(delqry);
4042  destroyPQExpBuffer(polprefix);
4043  free(qtabname);
4044 }
4045 
4046 /*
4047  * getPublications
4048  * get information about publications
4049  */
4051 getPublications(Archive *fout, int *numPublications)
4052 {
4053  DumpOptions *dopt = fout->dopt;
4054  PQExpBuffer query;
4055  PGresult *res;
4056  PublicationInfo *pubinfo;
4057  int i_tableoid;
4058  int i_oid;
4059  int i_pubname;
4060  int i_pubowner;
4061  int i_puballtables;
4062  int i_pubinsert;
4063  int i_pubupdate;
4064  int i_pubdelete;
4065  int i_pubtruncate;
4066  int i_pubviaroot;
4067  int i,
4068  ntups;
4069 
4070  if (dopt->no_publications || fout->remoteVersion < 100000)
4071  {
4072  *numPublications = 0;
4073  return NULL;
4074  }
4075 
4076  query = createPQExpBuffer();
4077 
4078  resetPQExpBuffer(query);
4079 
4080  /* Get the publications. */
4081  if (fout->remoteVersion >= 130000)
4082  appendPQExpBufferStr(query,
4083  "SELECT p.tableoid, p.oid, p.pubname, "
4084  "p.pubowner, "
4085  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
4086  "FROM pg_publication p");
4087  else if (fout->remoteVersion >= 110000)
4088  appendPQExpBufferStr(query,
4089  "SELECT p.tableoid, p.oid, p.pubname, "
4090  "p.pubowner, "
4091  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
4092  "FROM pg_publication p");
4093  else
4094  appendPQExpBufferStr(query,
4095  "SELECT p.tableoid, p.oid, p.pubname, "
4096  "p.pubowner, "
4097  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
4098  "FROM pg_publication p");
4099 
4100  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4101 
4102  ntups = PQntuples(res);
4103 
4104  i_tableoid = PQfnumber(res, "tableoid");
4105  i_oid = PQfnumber(res, "oid");
4106  i_pubname = PQfnumber(res, "pubname");
4107  i_pubowner = PQfnumber(res, "pubowner");
4108  i_puballtables = PQfnumber(res, "puballtables");
4109  i_pubinsert = PQfnumber(res, "pubinsert");
4110  i_pubupdate = PQfnumber(res, "pubupdate");
4111  i_pubdelete = PQfnumber(res, "pubdelete");
4112  i_pubtruncate = PQfnumber(res, "pubtruncate");
4113  i_pubviaroot = PQfnumber(res, "pubviaroot");
4114 
4115  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4116 
4117  for (i = 0; i < ntups; i++)
4118  {
4119  pubinfo[i].dobj.objType = DO_PUBLICATION;
4120  pubinfo[i].dobj.catId.tableoid =
4121  atooid(PQgetvalue(res, i, i_tableoid));
4122  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4123  AssignDumpId(&pubinfo[i].dobj);
4124  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4125  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4126  pubinfo[i].puballtables =
4127  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4128  pubinfo[i].pubinsert =
4129  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4130  pubinfo[i].pubupdate =
4131  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4132  pubinfo[i].pubdelete =
4133  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4134  pubinfo[i].pubtruncate =
4135  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4136  pubinfo[i].pubviaroot =
4137  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4138 
4139  /* Decide whether we want to dump it */
4140  selectDumpableObject(&(pubinfo[i].dobj), fout);
4141  }
4142  PQclear(res);
4143 
4144  destroyPQExpBuffer(query);
4145 
4146  *numPublications = ntups;
4147  return pubinfo;
4148 }
4149 
4150 /*
4151  * dumpPublication
4152  * dump the definition of the given publication
4153  */
4154 static void
4156 {
4157  DumpOptions *dopt = fout->dopt;
4158  PQExpBuffer delq;
4159  PQExpBuffer query;
4160  char *qpubname;
4161  bool first = true;
4162 
4163  /* Do nothing in data-only dump */
4164  if (dopt->dataOnly)
4165  return;
4166 
4167  delq = createPQExpBuffer();
4168  query = createPQExpBuffer();
4169 
4170  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4171 
4172  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4173  qpubname);
4174 
4175  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4176  qpubname);
4177 
4178  if (pubinfo->puballtables)
4179  appendPQExpBufferStr(query, " FOR ALL TABLES");
4180 
4181  appendPQExpBufferStr(query, " WITH (publish = '");
4182  if (pubinfo->pubinsert)
4183  {
4184  appendPQExpBufferStr(query, "insert");
4185  first = false;
4186  }
4187 
4188  if (pubinfo->pubupdate)
4189  {
4190  if (!first)
4191  appendPQExpBufferStr(query, ", ");
4192 
4193  appendPQExpBufferStr(query, "update");
4194  first = false;
4195  }
4196 
4197  if (pubinfo->pubdelete)
4198  {
4199  if (!first)
4200  appendPQExpBufferStr(query, ", ");
4201 
4202  appendPQExpBufferStr(query, "delete");
4203  first = false;
4204  }
4205 
4206  if (pubinfo->pubtruncate)
4207  {
4208  if (!first)
4209  appendPQExpBufferStr(query, ", ");
4210 
4211  appendPQExpBufferStr(query, "truncate");
4212  first = false;
4213  }
4214 
4215  appendPQExpBufferChar(query, '\'');
4216 
4217  if (pubinfo->pubviaroot)
4218  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4219 
4220  appendPQExpBufferStr(query, ");\n");
4221 
4222  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4223  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4224  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4225  .owner = pubinfo->rolname,
4226  .description = "PUBLICATION",
4227  .section = SECTION_POST_DATA,
4228  .createStmt = query->data,
4229  .dropStmt = delq->data));
4230 
4231  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4232  dumpComment(fout, "PUBLICATION", qpubname,
4233  NULL, pubinfo->rolname,
4234  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4235 
4236  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4237  dumpSecLabel(fout, "PUBLICATION", qpubname,
4238  NULL, pubinfo->rolname,
4239  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4240 
4241  destroyPQExpBuffer(delq);
4242  destroyPQExpBuffer(query);
4243  free(qpubname);
4244 }
4245 
4246 /*
4247  * getPublicationNamespaces
4248  * get information about publication membership for dumpable schemas.
4249  */
4250 void
4252 {
4253  PQExpBuffer query;
4254  PGresult *res;
4255  PublicationSchemaInfo *pubsinfo;
4256  DumpOptions *dopt = fout->dopt;
4257  int i_tableoid;
4258  int i_oid;
4259  int i_pnpubid;
4260  int i_pnnspid;
4261  int i,
4262  j,
4263  ntups;
4264 
4265  if (dopt->no_publications || fout->remoteVersion < 150000)
4266  return;
4267 
4268  query = createPQExpBuffer();
4269 
4270  /* Collect all publication membership info. */
4271  appendPQExpBufferStr(query,
4272  "SELECT tableoid, oid, pnpubid, pnnspid "
4273  "FROM pg_catalog.pg_publication_namespace");
4274  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4275 
4276  ntups = PQntuples(res);
4277 
4278  i_tableoid = PQfnumber(res, "tableoid");
4279  i_oid = PQfnumber(res, "oid");
4280  i_pnpubid = PQfnumber(res, "pnpubid");
4281  i_pnnspid = PQfnumber(res, "pnnspid");
4282 
4283  /* this allocation may be more than we need */
4284  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4285  j = 0;
4286 
4287  for (i = 0; i < ntups; i++)
4288  {
4289  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4290  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4291  PublicationInfo *pubinfo;
4292  NamespaceInfo *nspinfo;
4293 
4294  /*
4295  * Ignore any entries for which we aren't interested in either the
4296  * publication or the rel.
4297  */
4298  pubinfo = findPublicationByOid(pnpubid);
4299  if (pubinfo == NULL)
4300  continue;
4301  nspinfo = findNamespaceByOid(pnnspid);
4302  if (nspinfo == NULL)
4303  continue;
4304 
4305  /*
4306  * We always dump publication namespaces unless the corresponding
4307  * namespace is excluded from the dump.
4308  */
4309  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4310  continue;
4311 
4312  /* OK, make a DumpableObject for this relationship */
4314  pubsinfo[j].dobj.catId.tableoid =
4315  atooid(PQgetvalue(res, i, i_tableoid));
4316  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4317  AssignDumpId(&pubsinfo[j].dobj);
4318  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4319  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4320  pubsinfo[j].publication = pubinfo;
4321  pubsinfo[j].pubschema = nspinfo;
4322 
4323  /* Decide whether we want to dump it */
4324  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4325 
4326  j++;
4327  }
4328 
4329  PQclear(res);
4330  destroyPQExpBuffer(query);
4331 }
4332 
4333 /*
4334  * getPublicationTables
4335  * get information about publication membership for dumpable tables.
4336  */
4337 void
4338 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4339 {
4340  PQExpBuffer query;
4341  PGresult *res;
4342  PublicationRelInfo *pubrinfo;
4343  DumpOptions *dopt = fout->dopt;
4344  int i_tableoid;
4345  int i_oid;
4346  int i_prpubid;
4347  int i_prrelid;
4348  int i_prrelqual;
4349  int i_prattrs;
4350  int i,
4351  j,
4352  ntups;
4353 
4354  if (dopt->no_publications || fout->remoteVersion < 100000)
4355  return;
4356 
4357  query = createPQExpBuffer();
4358 
4359  /* Collect all publication membership info. */
4360  if (fout->remoteVersion >= 150000)
4361  appendPQExpBufferStr(query,
4362  "SELECT tableoid, oid, prpubid, prrelid, "
4363  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4364  "(CASE\n"
4365  " WHEN pr.prattrs IS NOT NULL THEN\n"
4366  " (SELECT array_agg(attname)\n"
4367  " FROM\n"
4368  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4369  " pg_catalog.pg_attribute\n"
4370  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4371  " ELSE NULL END) prattrs "
4372  "FROM pg_catalog.pg_publication_rel pr");
4373  else
4374  appendPQExpBufferStr(query,
4375  "SELECT tableoid, oid, prpubid, prrelid, "
4376  "NULL AS prrelqual, NULL AS prattrs "
4377  "FROM pg_catalog.pg_publication_rel");
4378  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4379 
4380  ntups = PQntuples(res);
4381 
4382  i_tableoid = PQfnumber(res, "tableoid");
4383  i_oid = PQfnumber(res, "oid");
4384  i_prpubid = PQfnumber(res, "prpubid");
4385  i_prrelid = PQfnumber(res, "prrelid");
4386  i_prrelqual = PQfnumber(res, "prrelqual");
4387  i_prattrs = PQfnumber(res, "prattrs");
4388 
4389  /* this allocation may be more than we need */
4390  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4391  j = 0;
4392 
4393  for (i = 0; i < ntups; i++)
4394  {
4395  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4396  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4397  PublicationInfo *pubinfo;
4398  TableInfo *tbinfo;
4399 
4400  /*
4401  * Ignore any entries for which we aren't interested in either the
4402  * publication or the rel.
4403  */
4404  pubinfo = findPublicationByOid(prpubid);
4405  if (pubinfo == NULL)
4406  continue;
4407  tbinfo = findTableByOid(prrelid);
4408  if (tbinfo == NULL)
4409  continue;
4410 
4411  /*
4412  * Ignore publication membership of tables whose definitions are not
4413  * to be dumped.
4414  */
4415  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4416  continue;
4417 
4418  /* OK, make a DumpableObject for this relationship */
4419  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4420  pubrinfo[j].dobj.catId.tableoid =
4421  atooid(PQgetvalue(res, i, i_tableoid));
4422  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4423  AssignDumpId(&pubrinfo[j].dobj);
4424  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4425  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4426  pubrinfo[j].publication = pubinfo;
4427  pubrinfo[j].pubtable = tbinfo;
4428  if (PQgetisnull(res, i, i_prrelqual))
4429  pubrinfo[j].pubrelqual = NULL;
4430  else
4431  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4432 
4433  if (!PQgetisnull(res, i, i_prattrs))
4434  {
4435  char **attnames;
4436  int nattnames;
4437  PQExpBuffer attribs;
4438 
4439  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4440  &attnames, &nattnames))
4441  pg_fatal("could not parse %s array", "prattrs");
4442  attribs = createPQExpBuffer();
4443  for (int k = 0; k < nattnames; k++)
4444  {
4445  if (k > 0)
4446  appendPQExpBufferStr(attribs, ", ");
4447 
4448  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4449  }
4450  pubrinfo[j].pubrattrs = attribs->data;
4451  }
4452  else
4453  pubrinfo[j].pubrattrs = NULL;
4454 
4455  /* Decide whether we want to dump it */
4456  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4457 
4458  j++;
4459  }
4460 
4461  PQclear(res);
4462  destroyPQExpBuffer(query);
4463 }
4464 
4465 /*
4466  * dumpPublicationNamespace
4467  * dump the definition of the given publication schema mapping.
4468  */
4469 static void
4471 {
4472  DumpOptions *dopt = fout->dopt;
4473  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4474  PublicationInfo *pubinfo = pubsinfo->publication;
4475  PQExpBuffer query;
4476  char *tag;
4477 
4478  /* Do nothing in data-only dump */
4479  if (dopt->dataOnly)
4480  return;
4481 
4482  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4483 
4484  query = createPQExpBuffer();
4485 
4486  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4487  appendPQExpBuffer(query, "ADD TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4488 
4489  /*
4490  * There is no point in creating drop query as the drop is done by schema
4491  * drop.
4492  */
4493  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4494  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4495  ARCHIVE_OPTS(.tag = tag,
4496  .namespace = schemainfo->dobj.name,
4497  .owner = pubinfo->rolname,
4498  .description = "PUBLICATION TABLES IN SCHEMA",
4499  .section = SECTION_POST_DATA,
4500  .createStmt = query->data));
4501 
4502  /* These objects can't currently have comments or seclabels */
4503 
4504  free(tag);
4505  destroyPQExpBuffer(query);
4506 }
4507 
4508 /*
4509  * dumpPublicationTable
4510  * dump the definition of the given publication table mapping
4511  */
4512 static void
4514 {
4515  DumpOptions *dopt = fout->dopt;
4516  PublicationInfo *pubinfo = pubrinfo->publication;
4517  TableInfo *tbinfo = pubrinfo->pubtable;
4518  PQExpBuffer query;
4519  char *tag;
4520 
4521  /* Do nothing in data-only dump */
4522  if (dopt->dataOnly)
4523  return;
4524 
4525  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4526 
4527  query = createPQExpBuffer();
4528 
4529  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4530  fmtId(pubinfo->dobj.name));
4531  appendPQExpBuffer(query, " %s",
4532  fmtQualifiedDumpable(tbinfo));
4533 
4534  if (pubrinfo->pubrattrs)
4535  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4536 
4537  if (pubrinfo->pubrelqual)
4538  {
4539  /*
4540  * It's necessary to add parentheses around the expression because
4541  * pg_get_expr won't supply the parentheses for things like WHERE
4542  * TRUE.
4543  */
4544  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4545  }
4546  appendPQExpBufferStr(query, ";\n");
4547 
4548  /*
4549  * There is no point in creating a drop query as the drop is done by table
4550  * drop. (If you think to change this, see also _printTocEntry().)
4551  * Although this object doesn't really have ownership as such, set the
4552  * owner field anyway to ensure that the command is run by the correct
4553  * role at restore time.
4554  */
4555  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4556  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4557  ARCHIVE_OPTS(.tag = tag,
4558  .namespace = tbinfo->dobj.namespace->dobj.name,
4559  .owner = pubinfo->rolname,
4560  .description = "PUBLICATION TABLE",
4561  .section = SECTION_POST_DATA,
4562  .createStmt = query->data));
4563 
4564  /* These objects can't currently have comments or seclabels */
4565 
4566  free(tag);
4567  destroyPQExpBuffer(query);
4568 }
4569 
4570 /*
4571  * Is the currently connected user a superuser?
4572  */
4573 static bool
4575 {
4576  ArchiveHandle *AH = (ArchiveHandle *) fout;
4577  const char *val;
4578 
4579  val = PQparameterStatus(AH->connection, "is_superuser");
4580 
4581  if (val && strcmp(val, "on") == 0)
4582  return true;
4583 
4584  return false;
4585 }
4586 
4587 /*
4588  * getSubscriptions
4589  * get information about subscriptions
4590  */
4591 void
4593 {
4594  DumpOptions *dopt = fout->dopt;
4595  PQExpBuffer query;
4596  PGresult *res;
4597  SubscriptionInfo *subinfo;
4598  int i_tableoid;
4599  int i_oid;
4600  int i_subname;
4601  int i_subowner;
4602  int i_substream;
4603  int i_subtwophasestate;
4604  int i_subdisableonerr;
4605  int i_suborigin;
4606  int i_subconninfo;
4607  int i_subslotname;
4608  int i_subsynccommit;
4609  int i_subpublications;
4610  int i_subbinary;
4611  int i,
4612  ntups;
4613 
4614  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4615  return;
4616 
4617  if (!is_superuser(fout))
4618  {
4619  int n;
4620 
4621  res = ExecuteSqlQuery(fout,
4622  "SELECT count(*) FROM pg_subscription "
4623  "WHERE subdbid = (SELECT oid FROM pg_database"
4624  " WHERE datname = current_database())",
4625  PGRES_TUPLES_OK);
4626  n = atoi(PQgetvalue(res, 0, 0));
4627  if (n > 0)
4628  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4629  PQclear(res);
4630  return;
4631  }
4632 
4633  query = createPQExpBuffer();
4634 
4635  /* Get the subscriptions in current database. */
4636  appendPQExpBufferStr(query,
4637  "SELECT s.tableoid, s.oid, s.subname,\n"
4638  " s.subowner,\n"
4639  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4640  " s.subpublications,\n");
4641 
4642  if (fout->remoteVersion >= 140000)
4643  appendPQExpBufferStr(query, " s.subbinary,\n");
4644  else
4645  appendPQExpBufferStr(query, " false AS subbinary,\n");
4646 
4647  if (fout->remoteVersion >= 140000)
4648  appendPQExpBufferStr(query, " s.substream,\n");
4649  else
4650  appendPQExpBufferStr(query, " 'f' AS substream,\n");
4651 
4652  if (fout->remoteVersion >= 150000)
4653  appendPQExpBufferStr(query,
4654  " s.subtwophasestate,\n"
4655  " s.subdisableonerr,\n");
4656  else
4657  appendPQExpBuffer(query,
4658  " '%c' AS subtwophasestate,\n"
4659  " false AS subdisableonerr,\n",
4661 
4662  if (fout->remoteVersion >= 160000)
4663  appendPQExpBufferStr(query, " s.suborigin\n");
4664  else
4665  appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
4666 
4667  appendPQExpBufferStr(query,
4668  "FROM pg_subscription s\n"
4669  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4670  " WHERE datname = current_database())");
4671 
4672  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4673 
4674  ntups = PQntuples(res);
4675 
4676  /*
4677  * Get subscription fields. We don't include subskiplsn in the dump as
4678  * after restoring the dump this value may no longer be relevant.
4679  */
4680  i_tableoid = PQfnumber(res, "tableoid");
4681  i_oid = PQfnumber(res, "oid");
4682  i_subname = PQfnumber(res, "subname");
4683  i_subowner = PQfnumber(res, "subowner");
4684  i_subconninfo = PQfnumber(res, "subconninfo");
4685  i_subslotname = PQfnumber(res, "subslotname");
4686  i_subsynccommit = PQfnumber(res, "subsynccommit");
4687  i_subpublications = PQfnumber(res, "subpublications");
4688  i_subbinary = PQfnumber(res, "subbinary");
4689  i_substream = PQfnumber(res, "substream");
4690  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4691  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4692  i_suborigin = PQfnumber(res, "suborigin");
4693 
4694  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4695 
4696  for (i = 0; i < ntups; i++)
4697  {
4698  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4699  subinfo[i].dobj.catId.tableoid =
4700  atooid(PQgetvalue(res, i, i_tableoid));
4701  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4702  AssignDumpId(&subinfo[i].dobj);
4703  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4704  subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
4705  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4706  if (PQgetisnull(res, i, i_subslotname))
4707  subinfo[i].subslotname = NULL;
4708  else
4709  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4710  subinfo[i].subsynccommit =
4711  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4712  subinfo[i].subpublications =
4713  pg_strdup(PQgetvalue(res, i, i_subpublications));
4714  subinfo[i].subbinary =
4715  pg_strdup(PQgetvalue(res, i, i_subbinary));
4716  subinfo[i].substream =
4717  pg_strdup(PQgetvalue(res, i, i_substream));
4718  subinfo[i].subtwophasestate =
4719  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4720  subinfo[i].subdisableonerr =
4721  pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
4722  subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
4723 
4724  /* Decide whether we want to dump it */
4725  selectDumpableObject(&(subinfo[i].dobj), fout);
4726  }
4727  PQclear(res);
4728 
4729  destroyPQExpBuffer(query);
4730 }
4731 
4732 /*
4733  * dumpSubscription
4734  * dump the definition of the given subscription
4735  */
4736 static void
4738 {
4739  DumpOptions *dopt = fout->dopt;
4740  PQExpBuffer delq;
4741  PQExpBuffer query;
4742  PQExpBuffer publications;
4743  char *qsubname;
4744  char **pubnames = NULL;
4745  int npubnames = 0;
4746  int i;
4747  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4748 
4749  /* Do nothing in data-only dump */
4750  if (dopt->dataOnly)
4751  return;
4752 
4753  delq = createPQExpBuffer();
4754  query = createPQExpBuffer();
4755 
4756  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4757 
4758  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4759  qsubname);
4760 
4761  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4762  qsubname);
4763  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4764 
4765  /* Build list of quoted publications and append them to query. */
4766  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4767  pg_fatal("could not parse %s array", "subpublications");
4768 
4769  publications = createPQExpBuffer();
4770  for (i = 0; i < npubnames; i++)
4771  {
4772  if (i > 0)
4773  appendPQExpBufferStr(publications, ", ");
4774 
4775  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4776  }
4777 
4778  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4779  if (subinfo->subslotname)
4780  appendStringLiteralAH(query, subinfo->subslotname, fout);
4781  else
4782  appendPQExpBufferStr(query, "NONE");
4783 
4784  if (strcmp(subinfo->subbinary, "t") == 0)
4785  appendPQExpBufferStr(query, ", binary = true");
4786 
4787  if (strcmp(subinfo->substream, "t") == 0)
4788  appendPQExpBufferStr(query, ", streaming = on");
4789  else if (strcmp(subinfo->substream, "p") == 0)
4790  appendPQExpBufferStr(query, ", streaming = parallel");
4791 
4792  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4793  appendPQExpBufferStr(query, ", two_phase = on");
4794 
4795  if (strcmp(subinfo->subdisableonerr, "t") == 0)
4796  appendPQExpBufferStr(query, ", disable_on_error = true");
4797 
4798  if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
4799  appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
4800 
4801  if (strcmp(subinfo->subsynccommit, "off") != 0)
4802  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4803 
4804  appendPQExpBufferStr(query, ");\n");
4805 
4806  if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4807  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4808  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4809  .owner = subinfo->rolname,
4810  .description = "SUBSCRIPTION",
4811  .section = SECTION_POST_DATA,
4812  .createStmt = query->data,
4813  .dropStmt = delq->data));
4814 
4815  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4816  dumpComment(fout, "SUBSCRIPTION", qsubname,
4817  NULL, subinfo->rolname,
4818  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4819 
4820  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4821  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4822  NULL, subinfo->rolname,
4823  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4824 
4825  destroyPQExpBuffer(publications);
4826  free(pubnames);
4827 
4828  destroyPQExpBuffer(delq);
4829  destroyPQExpBuffer(query);
4830  free(qsubname);
4831 }
4832 
4833 /*
4834  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4835  * the object needs.
4836  */
4837 static void
4839  PQExpBuffer create,
4840  const DumpableObject *dobj,
4841  const char *catalog,
4842  const char *keyword,
4843  const char *objname)
4844 {
4845  if (dobj->depends_on_ext)
4846  {
4847  char *nm;
4848  PGresult *res;
4849  PQExpBuffer query;
4850  int ntups;
4851  int i_extname;
4852  int i;
4853 
4854  /* dodge fmtId() non-reentrancy */
4855  nm = pg_strdup(objname);
4856 
4857  query = createPQExpBuffer();
4858  appendPQExpBuffer(query,
4859  "SELECT e.extname "
4860  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4861  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4862  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4863  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4864  catalog,
4865  dobj->catId.oid);
4866  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4867  ntups = PQntuples(res);
4868  i_extname = PQfnumber(res, "extname");
4869  for (i = 0; i < ntups; i++)
4870  {
4871  appendPQExpBuffer(create, "ALTER %s %s DEPENDS ON EXTENSION %s;\n",
4872  keyword, nm,
4873  fmtId(PQgetvalue(res, i, i_extname)));
4874  }
4875 
4876  PQclear(res);
4877  destroyPQExpBuffer(query);
4878  pg_free(nm);
4879  }
4880 }
4881 
4882 static Oid
4884 {
4885  /*
4886  * If the old version didn't assign an array type, but the new version
4887  * does, we must select an unused type OID to assign. This currently only
4888  * happens for domains, when upgrading pre-v11 to v11 and up.
4889  *
4890  * Note: local state here is kind of ugly, but we must have some, since we
4891  * mustn't choose the same unused OID more than once.
4892  */
4893  static Oid next_possible_free_oid = FirstNormalObjectId;
4894  PGresult *res;
4895  bool is_dup;
4896 
4897  do
4898  {
4899  ++next_possible_free_oid;
4900  printfPQExpBuffer(upgrade_query,
4901  "SELECT EXISTS(SELECT 1 "
4902  "FROM pg_catalog.pg_type "
4903  "WHERE oid = '%u'::pg_catalog.oid);",
4904  next_possible_free_oid);
4905  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4906  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4907  PQclear(res);
4908  } while (is_dup);
4909 
4910  return next_possible_free_oid;
4911 }
4912 
4913 static void
4915  PQExpBuffer upgrade_buffer,
4916  Oid pg_type_oid,
4917  bool force_array_type,
4918  bool include_multirange_type)
4919 {
4920  PQExpBuffer upgrade_query = createPQExpBuffer();
4921  PGresult *res;
4922  Oid pg_type_array_oid;
4923  Oid pg_type_multirange_oid;
4924  Oid pg_type_multirange_array_oid;
4925 
4926  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4927  appendPQExpBuffer(upgrade_buffer,
4928  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4929  pg_type_oid);
4930 
4931  appendPQExpBuffer(upgrade_query,
4932  "SELECT typarray "
4933  "FROM pg_catalog.pg_type "
4934  "WHERE oid = '%u'::pg_catalog.oid;",
4935  pg_type_oid);
4936 
4937  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4938 
4939  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4940 
4941  PQclear(res);
4942 
4943  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4944  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4945 
4946  if (OidIsValid(pg_type_array_oid))
4947  {
4948  appendPQExpBufferStr(upgrade_buffer,
4949  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4950  appendPQExpBuffer(upgrade_buffer,
4951  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4952  pg_type_array_oid);
4953  }
4954 
4955  /*
4956  * Pre-set the multirange type oid and its own array type oid.
4957  */
4958  if (include_multirange_type)
4959  {
4960  if (fout->remoteVersion >= 140000)
4961  {
4962  printfPQExpBuffer(upgrade_query,
4963  "SELECT t.oid, t.typarray "
4964  "FROM pg_catalog.pg_type t "
4965  "JOIN pg_catalog.pg_range r "
4966  "ON t.oid = r.rngmultitypid "
4967  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4968  pg_type_oid);
4969 
4970  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4971 
4972  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4973  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4974 
4975  PQclear(res);
4976  }
4977  else
4978  {
4979  pg_type_multirange_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4980  pg_type_multirange_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4981  }
4982 
4983  appendPQExpBufferStr(upgrade_buffer,
4984  "\n-- For binary upgrade, must preserve multirange pg_type oid\n");
4985