PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "common/relpath.h"
59 #include "compress_io.h"
60 #include "dumputils.h"
61 #include "fe_utils/option_utils.h"
62 #include "fe_utils/string_utils.h"
63 #include "filter.h"
64 #include "getopt_long.h"
65 #include "libpq/libpq-fs.h"
66 #include "parallel.h"
67 #include "pg_backup_db.h"
68 #include "pg_backup_utils.h"
69 #include "pg_dump.h"
70 #include "storage/block.h"
71 
72 typedef struct
73 {
74  Oid roleoid; /* role's OID */
75  const char *rolename; /* role's name */
76 } RoleNameItem;
77 
78 typedef struct
79 {
80  const char *descr; /* comment for an object */
81  Oid classoid; /* object class (catalog OID) */
82  Oid objoid; /* object OID */
83  int objsubid; /* subobject (table column #) */
84 } CommentItem;
85 
86 typedef struct
87 {
88  const char *provider; /* label provider of this security label */
89  const char *label; /* security label for an object */
90  Oid classoid; /* object class (catalog OID) */
91  Oid objoid; /* object OID */
92  int objsubid; /* subobject (table column #) */
93 } SecLabelItem;
94 
95 typedef enum OidOptions
96 {
101 
102 /* global decls */
103 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
104 
105 static Oid g_last_builtin_oid; /* value of the last builtin oid */
106 
107 /* The specified names/patterns should to match at least one entity */
108 static int strict_names = 0;
109 
111 
112 /*
113  * Object inclusion/exclusion lists
114  *
115  * The string lists record the patterns given by command-line switches,
116  * which we then convert to lists of OIDs of matching objects.
117  */
119 static SimpleOidList schema_include_oids = {NULL, NULL};
121 static SimpleOidList schema_exclude_oids = {NULL, NULL};
122 
125 static SimpleOidList table_include_oids = {NULL, NULL};
128 static SimpleOidList table_exclude_oids = {NULL, NULL};
131 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
132 
135 
137 static SimpleOidList extension_include_oids = {NULL, NULL};
138 
140 static SimpleOidList extension_exclude_oids = {NULL, NULL};
141 
142 static const CatalogId nilCatalogId = {0, 0};
143 
144 /* override for standard extra_float_digits setting */
145 static bool have_extra_float_digits = false;
147 
148 /* sorted table of role names */
149 static RoleNameItem *rolenames = NULL;
150 static int nrolenames = 0;
151 
152 /* sorted table of comments */
153 static CommentItem *comments = NULL;
154 static int ncomments = 0;
155 
156 /* sorted table of security labels */
157 static SecLabelItem *seclabels = NULL;
158 static int nseclabels = 0;
159 
160 /*
161  * The default number of rows per INSERT when
162  * --inserts is specified without --rows-per-insert
163  */
164 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
165 
166 /*
167  * Maximum number of large objects to group into a single ArchiveEntry.
168  * At some point we might want to make this user-controllable, but for now
169  * a hard-wired setting will suffice.
170  */
171 #define MAX_BLOBS_PER_ARCHIVE_ENTRY 1000
172 
173 /*
174  * Macro for producing quoted, schema-qualified name of a dumpable object.
175  */
176 #define fmtQualifiedDumpable(obj) \
177  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
178  (obj)->dobj.name)
179 
180 static void help(const char *progname);
181 static void setup_connection(Archive *AH,
182  const char *dumpencoding, const char *dumpsnapshot,
183  char *use_role);
185 static void expand_schema_name_patterns(Archive *fout,
186  SimpleStringList *patterns,
187  SimpleOidList *oids,
188  bool strict_names);
189 static void expand_extension_name_patterns(Archive *fout,
190  SimpleStringList *patterns,
191  SimpleOidList *oids,
192  bool strict_names);
194  SimpleStringList *patterns,
195  SimpleOidList *oids);
196 static void expand_table_name_patterns(Archive *fout,
197  SimpleStringList *patterns,
198  SimpleOidList *oids,
199  bool strict_names,
200  bool with_child_tables);
201 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
202  const char *pattern);
203 
204 static NamespaceInfo *findNamespace(Oid nsoid);
205 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
206 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
207 static const char *getRoleName(const char *roleoid_str);
208 static void collectRoleNames(Archive *fout);
209 static void getAdditionalACLs(Archive *fout);
210 static void dumpCommentExtended(Archive *fout, const char *type,
211  const char *name, const char *namespace,
212  const char *owner, CatalogId catalogId,
213  int subid, DumpId dumpId,
214  const char *initdb_comment);
215 static inline void dumpComment(Archive *fout, const char *type,
216  const char *name, const char *namespace,
217  const char *owner, CatalogId catalogId,
218  int subid, DumpId dumpId);
219 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
220 static void collectComments(Archive *fout);
221 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
222  const char *namespace, const char *owner,
223  CatalogId catalogId, int subid, DumpId dumpId);
224 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
225 static void collectSecLabels(Archive *fout);
226 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
227 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
228 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
229 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
230 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
231 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
232 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
233 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
234 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
235 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
236 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
237  PGresult *res);
238 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
239 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
240 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
241 static void dumpCast(Archive *fout, const CastInfo *cast);
242 static void dumpTransform(Archive *fout, const TransformInfo *transform);
243 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
244 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
245 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
246 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
247 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
248 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
249 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
250 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
251 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
252 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
253 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
254 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
255 static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
256 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
257 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
258 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
259 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
260 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
261 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
262 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
263 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
264 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
265 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
266 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
267 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
268 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
269 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
270 static void dumpUserMappings(Archive *fout,
271  const char *servername, const char *namespace,
272  const char *owner, CatalogId catalogId, DumpId dumpId);
273 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
274 
275 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
276  const char *type, const char *name, const char *subname,
277  const char *nspname, const char *tag, const char *owner,
278  const DumpableAcl *dacl);
279 
280 static void getDependencies(Archive *fout);
281 static void BuildArchiveDependencies(Archive *fout);
282 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
283  DumpId **dependencies, int *nDeps, int *allocDeps);
284 
286 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
287  DumpableObject *boundaryObjs);
288 
289 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
290 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
291 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
292 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
293 static void buildMatViewRefreshDependencies(Archive *fout);
294 static void getTableDataFKConstraints(void);
295 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
296  bool is_agg);
297 static char *format_function_signature(Archive *fout,
298  const FuncInfo *finfo, bool honor_quotes);
299 static char *convertRegProcReference(const char *proc);
300 static char *getFormattedOperatorName(const char *oproid);
301 static char *convertTSFunction(Archive *fout, Oid funcOid);
302 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
303 static void getLOs(Archive *fout);
304 static void dumpLO(Archive *fout, const LoInfo *loinfo);
305 static int dumpLOs(Archive *fout, const void *arg);
306 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
307 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
308 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
309 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
310 static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
311 static void dumpDatabase(Archive *fout);
312 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
313  const char *dbname, Oid dboid);
314 static void dumpEncoding(Archive *AH);
315 static void dumpStdStrings(Archive *AH);
316 static void dumpSearchPath(Archive *AH);
318  PQExpBuffer upgrade_buffer,
319  Oid pg_type_oid,
320  bool force_array_type,
321  bool include_multirange_type);
323  PQExpBuffer upgrade_buffer,
324  const TableInfo *tbinfo);
325 static void binary_upgrade_set_pg_class_oids(Archive *fout,
326  PQExpBuffer upgrade_buffer,
327  Oid pg_class_oid, bool is_index);
328 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
329  const DumpableObject *dobj,
330  const char *objtype,
331  const char *objname,
332  const char *objnamespace);
333 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
334 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
335 static bool nonemptyReloptions(const char *reloptions);
336 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
337  const char *prefix, Archive *fout);
338 static char *get_synchronized_snapshot(Archive *fout);
339 static void setupDumpWorker(Archive *AH);
340 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
341 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
342 static void read_dump_filters(const char *filename, DumpOptions *dopt);
343 
344 
345 int
346 main(int argc, char **argv)
347 {
348  int c;
349  const char *filename = NULL;
350  const char *format = "p";
351  TableInfo *tblinfo;
352  int numTables;
353  DumpableObject **dobjs;
354  int numObjs;
355  DumpableObject *boundaryObjs;
356  int i;
357  int optindex;
358  RestoreOptions *ropt;
359  Archive *fout; /* the script file */
360  bool g_verbose = false;
361  const char *dumpencoding = NULL;
362  const char *dumpsnapshot = NULL;
363  char *use_role = NULL;
364  int numWorkers = 1;
365  int plainText = 0;
366  ArchiveFormat archiveFormat = archUnknown;
367  ArchiveMode archiveMode;
368  pg_compress_specification compression_spec = {0};
369  char *compression_detail = NULL;
370  char *compression_algorithm_str = "none";
371  char *error_detail = NULL;
372  bool user_compression_defined = false;
374 
375  static DumpOptions dopt;
376 
377  static struct option long_options[] = {
378  {"data-only", no_argument, NULL, 'a'},
379  {"blobs", no_argument, NULL, 'b'},
380  {"large-objects", no_argument, NULL, 'b'},
381  {"no-blobs", no_argument, NULL, 'B'},
382  {"no-large-objects", no_argument, NULL, 'B'},
383  {"clean", no_argument, NULL, 'c'},
384  {"create", no_argument, NULL, 'C'},
385  {"dbname", required_argument, NULL, 'd'},
386  {"extension", required_argument, NULL, 'e'},
387  {"file", required_argument, NULL, 'f'},
388  {"format", required_argument, NULL, 'F'},
389  {"host", required_argument, NULL, 'h'},
390  {"jobs", 1, NULL, 'j'},
391  {"no-reconnect", no_argument, NULL, 'R'},
392  {"no-owner", no_argument, NULL, 'O'},
393  {"port", required_argument, NULL, 'p'},
394  {"schema", required_argument, NULL, 'n'},
395  {"exclude-schema", required_argument, NULL, 'N'},
396  {"schema-only", no_argument, NULL, 's'},
397  {"superuser", required_argument, NULL, 'S'},
398  {"table", required_argument, NULL, 't'},
399  {"exclude-table", required_argument, NULL, 'T'},
400  {"no-password", no_argument, NULL, 'w'},
401  {"password", no_argument, NULL, 'W'},
402  {"username", required_argument, NULL, 'U'},
403  {"verbose", no_argument, NULL, 'v'},
404  {"no-privileges", no_argument, NULL, 'x'},
405  {"no-acl", no_argument, NULL, 'x'},
406  {"compress", required_argument, NULL, 'Z'},
407  {"encoding", required_argument, NULL, 'E'},
408  {"help", no_argument, NULL, '?'},
409  {"version", no_argument, NULL, 'V'},
410 
411  /*
412  * the following options don't have an equivalent short option letter
413  */
414  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
415  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
416  {"column-inserts", no_argument, &dopt.column_inserts, 1},
417  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
418  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
419  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
420  {"exclude-table-data", required_argument, NULL, 4},
421  {"extra-float-digits", required_argument, NULL, 8},
422  {"if-exists", no_argument, &dopt.if_exists, 1},
423  {"inserts", no_argument, NULL, 9},
424  {"lock-wait-timeout", required_argument, NULL, 2},
425  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
426  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
427  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
428  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
429  {"role", required_argument, NULL, 3},
430  {"section", required_argument, NULL, 5},
431  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
432  {"snapshot", required_argument, NULL, 6},
433  {"strict-names", no_argument, &strict_names, 1},
434  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
435  {"no-comments", no_argument, &dopt.no_comments, 1},
436  {"no-publications", no_argument, &dopt.no_publications, 1},
437  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
438  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
439  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
440  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
441  {"no-sync", no_argument, NULL, 7},
442  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
443  {"rows-per-insert", required_argument, NULL, 10},
444  {"include-foreign-data", required_argument, NULL, 11},
445  {"table-and-children", required_argument, NULL, 12},
446  {"exclude-table-and-children", required_argument, NULL, 13},
447  {"exclude-table-data-and-children", required_argument, NULL, 14},
448  {"sync-method", required_argument, NULL, 15},
449  {"filter", required_argument, NULL, 16},
450  {"exclude-extension", required_argument, NULL, 17},
451 
452  {NULL, 0, NULL, 0}
453  };
454 
455  pg_logging_init(argv[0]);
457  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
458 
459  /*
460  * Initialize what we need for parallel execution, especially for thread
461  * support on Windows.
462  */
464 
465  progname = get_progname(argv[0]);
466 
467  if (argc > 1)
468  {
469  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
470  {
471  help(progname);
472  exit_nicely(0);
473  }
474  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
475  {
476  puts("pg_dump (PostgreSQL) " PG_VERSION);
477  exit_nicely(0);
478  }
479  }
480 
481  InitDumpOptions(&dopt);
482 
483  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
484  long_options, &optindex)) != -1)
485  {
486  switch (c)
487  {
488  case 'a': /* Dump data only */
489  dopt.dataOnly = true;
490  break;
491 
492  case 'b': /* Dump LOs */
493  dopt.outputLOs = true;
494  break;
495 
496  case 'B': /* Don't dump LOs */
497  dopt.dontOutputLOs = true;
498  break;
499 
500  case 'c': /* clean (i.e., drop) schema prior to create */
501  dopt.outputClean = 1;
502  break;
503 
504  case 'C': /* Create DB */
505  dopt.outputCreateDB = 1;
506  break;
507 
508  case 'd': /* database name */
509  dopt.cparams.dbname = pg_strdup(optarg);
510  break;
511 
512  case 'e': /* include extension(s) */
514  dopt.include_everything = false;
515  break;
516 
517  case 'E': /* Dump encoding */
518  dumpencoding = pg_strdup(optarg);
519  break;
520 
521  case 'f':
523  break;
524 
525  case 'F':
527  break;
528 
529  case 'h': /* server host */
530  dopt.cparams.pghost = pg_strdup(optarg);
531  break;
532 
533  case 'j': /* number of dump jobs */
534  if (!option_parse_int(optarg, "-j/--jobs", 1,
535  PG_MAX_JOBS,
536  &numWorkers))
537  exit_nicely(1);
538  break;
539 
540  case 'n': /* include schema(s) */
542  dopt.include_everything = false;
543  break;
544 
545  case 'N': /* exclude schema(s) */
547  break;
548 
549  case 'O': /* Don't reconnect to match owner */
550  dopt.outputNoOwner = 1;
551  break;
552 
553  case 'p': /* server port */
554  dopt.cparams.pgport = pg_strdup(optarg);
555  break;
556 
557  case 'R':
558  /* no-op, still accepted for backwards compatibility */
559  break;
560 
561  case 's': /* dump schema only */
562  dopt.schemaOnly = true;
563  break;
564 
565  case 'S': /* Username for superuser in plain text output */
567  break;
568 
569  case 't': /* include table(s) */
571  dopt.include_everything = false;
572  break;
573 
574  case 'T': /* exclude table(s) */
576  break;
577 
578  case 'U':
580  break;
581 
582  case 'v': /* verbose */
583  g_verbose = true;
585  break;
586 
587  case 'w':
589  break;
590 
591  case 'W':
593  break;
594 
595  case 'x': /* skip ACL dump */
596  dopt.aclsSkip = true;
597  break;
598 
599  case 'Z': /* Compression */
600  parse_compress_options(optarg, &compression_algorithm_str,
601  &compression_detail);
602  user_compression_defined = true;
603  break;
604 
605  case 0:
606  /* This covers the long options. */
607  break;
608 
609  case 2: /* lock-wait-timeout */
611  break;
612 
613  case 3: /* SET ROLE */
614  use_role = pg_strdup(optarg);
615  break;
616 
617  case 4: /* exclude table(s) data */
619  break;
620 
621  case 5: /* section */
623  break;
624 
625  case 6: /* snapshot */
626  dumpsnapshot = pg_strdup(optarg);
627  break;
628 
629  case 7: /* no-sync */
630  dosync = false;
631  break;
632 
633  case 8:
635  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
637  exit_nicely(1);
638  break;
639 
640  case 9: /* inserts */
641 
642  /*
643  * dump_inserts also stores --rows-per-insert, careful not to
644  * overwrite that.
645  */
646  if (dopt.dump_inserts == 0)
648  break;
649 
650  case 10: /* rows per insert */
651  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
652  &dopt.dump_inserts))
653  exit_nicely(1);
654  break;
655 
656  case 11: /* include foreign data */
658  optarg);
659  break;
660 
661  case 12: /* include table(s) and their children */
663  optarg);
664  dopt.include_everything = false;
665  break;
666 
667  case 13: /* exclude table(s) and their children */
669  optarg);
670  break;
671 
672  case 14: /* exclude data of table(s) and children */
674  optarg);
675  break;
676 
677  case 15:
679  exit_nicely(1);
680  break;
681 
682  case 16: /* read object filters from file */
683  read_dump_filters(optarg, &dopt);
684  break;
685 
686  case 17: /* exclude extension(s) */
688  optarg);
689  break;
690 
691  default:
692  /* getopt_long already emitted a complaint */
693  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
694  exit_nicely(1);
695  }
696  }
697 
698  /*
699  * Non-option argument specifies database name as long as it wasn't
700  * already specified with -d / --dbname
701  */
702  if (optind < argc && dopt.cparams.dbname == NULL)
703  dopt.cparams.dbname = argv[optind++];
704 
705  /* Complain if any arguments remain */
706  if (optind < argc)
707  {
708  pg_log_error("too many command-line arguments (first is \"%s\")",
709  argv[optind]);
710  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
711  exit_nicely(1);
712  }
713 
714  /* --column-inserts implies --inserts */
715  if (dopt.column_inserts && dopt.dump_inserts == 0)
717 
718  /*
719  * Binary upgrade mode implies dumping sequence data even in schema-only
720  * mode. This is not exposed as a separate option, but kept separate
721  * internally for clarity.
722  */
723  if (dopt.binary_upgrade)
724  dopt.sequence_data = 1;
725 
726  if (dopt.dataOnly && dopt.schemaOnly)
727  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
728 
730  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
731 
732  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
733  pg_fatal("option --include-foreign-data is not supported with parallel backup");
734 
735  if (dopt.dataOnly && dopt.outputClean)
736  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
737 
738  if (dopt.if_exists && !dopt.outputClean)
739  pg_fatal("option --if-exists requires option -c/--clean");
740 
741  /*
742  * --inserts are already implied above if --column-inserts or
743  * --rows-per-insert were specified.
744  */
745  if (dopt.do_nothing && dopt.dump_inserts == 0)
746  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
747 
748  /* Identify archive format to emit */
749  archiveFormat = parseArchiveFormat(format, &archiveMode);
750 
751  /* archiveFormat specific setup */
752  if (archiveFormat == archNull)
753  plainText = 1;
754 
755  /*
756  * Custom and directory formats are compressed by default with gzip when
757  * available, not the others. If gzip is not available, no compression is
758  * done by default.
759  */
760  if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
761  !user_compression_defined)
762  {
763 #ifdef HAVE_LIBZ
764  compression_algorithm_str = "gzip";
765 #else
766  compression_algorithm_str = "none";
767 #endif
768  }
769 
770  /*
771  * Compression options
772  */
773  if (!parse_compress_algorithm(compression_algorithm_str,
775  pg_fatal("unrecognized compression algorithm: \"%s\"",
776  compression_algorithm_str);
777 
779  &compression_spec);
780  error_detail = validate_compress_specification(&compression_spec);
781  if (error_detail != NULL)
782  pg_fatal("invalid compression specification: %s",
783  error_detail);
784 
785  error_detail = supports_compression(compression_spec);
786  if (error_detail != NULL)
787  pg_fatal("%s", error_detail);
788 
789  /*
790  * Disable support for zstd workers for now - these are based on
791  * threading, and it's unclear how it interacts with parallel dumps on
792  * platforms where that relies on threads too (e.g. Windows).
793  */
794  if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
795  pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
796  "workers");
797 
798  /*
799  * If emitting an archive format, we always want to emit a DATABASE item,
800  * in case --create is specified at pg_restore time.
801  */
802  if (!plainText)
803  dopt.outputCreateDB = 1;
804 
805  /* Parallel backup only in the directory archive format so far */
806  if (archiveFormat != archDirectory && numWorkers > 1)
807  pg_fatal("parallel backup only supported by the directory format");
808 
809  /* Open the output file */
810  fout = CreateArchive(filename, archiveFormat, compression_spec,
811  dosync, archiveMode, setupDumpWorker, sync_method);
812 
813  /* Make dump options accessible right away */
814  SetArchiveOptions(fout, &dopt, NULL);
815 
816  /* Register the cleanup hook */
817  on_exit_close_archive(fout);
818 
819  /* Let the archiver know how noisy to be */
820  fout->verbose = g_verbose;
821 
822 
823  /*
824  * We allow the server to be back to 9.2, and up to any minor release of
825  * our own major version. (See also version check in pg_dumpall.c.)
826  */
827  fout->minRemoteVersion = 90200;
828  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
829 
830  fout->numWorkers = numWorkers;
831 
832  /*
833  * Open the database using the Archiver, so it knows about it. Errors mean
834  * death.
835  */
836  ConnectDatabase(fout, &dopt.cparams, false);
837  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
838 
839  /*
840  * On hot standbys, never try to dump unlogged table data, since it will
841  * just throw an error.
842  */
843  if (fout->isStandby)
844  dopt.no_unlogged_table_data = true;
845 
846  /*
847  * Find the last built-in OID, if needed (prior to 8.1)
848  *
849  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
850  */
852 
853  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
854 
855  /* Expand schema selection patterns into OID lists */
856  if (schema_include_patterns.head != NULL)
857  {
860  strict_names);
861  if (schema_include_oids.head == NULL)
862  pg_fatal("no matching schemas were found");
863  }
866  false);
867  /* non-matching exclusion patterns aren't an error */
868 
869  /* Expand table selection patterns into OID lists */
872  strict_names, false);
875  strict_names, true);
876  if ((table_include_patterns.head != NULL ||
878  table_include_oids.head == NULL)
879  pg_fatal("no matching tables were found");
880 
883  false, false);
886  false, true);
887 
890  false, false);
893  false, true);
894 
897 
898  /* non-matching exclusion patterns aren't an error */
899 
900  /* Expand extension selection patterns into OID lists */
901  if (extension_include_patterns.head != NULL)
902  {
905  strict_names);
906  if (extension_include_oids.head == NULL)
907  pg_fatal("no matching extensions were found");
908  }
911  false);
912  /* non-matching exclusion patterns aren't an error */
913 
914  /*
915  * Dumping LOs is the default for dumps where an inclusion switch is not
916  * used (an "include everything" dump). -B can be used to exclude LOs
917  * from those dumps. -b can be used to include LOs even when an inclusion
918  * switch is used.
919  *
920  * -s means "schema only" and LOs are data, not schema, so we never
921  * include LOs when -s is used.
922  */
923  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputLOs)
924  dopt.outputLOs = true;
925 
926  /*
927  * Collect role names so we can map object owner OIDs to names.
928  */
929  collectRoleNames(fout);
930 
931  /*
932  * Now scan the database and create DumpableObject structs for all the
933  * objects we intend to dump.
934  */
935  tblinfo = getSchemaData(fout, &numTables);
936 
937  if (!dopt.schemaOnly)
938  {
939  getTableData(&dopt, tblinfo, numTables, 0);
941  if (dopt.dataOnly)
943  }
944 
945  if (dopt.schemaOnly && dopt.sequence_data)
946  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
947 
948  /*
949  * In binary-upgrade mode, we do not have to worry about the actual LO
950  * data or the associated metadata that resides in the pg_largeobject and
951  * pg_largeobject_metadata tables, respectively.
952  *
953  * However, we do need to collect LO information as there may be comments
954  * or other information on LOs that we do need to dump out.
955  */
956  if (dopt.outputLOs || dopt.binary_upgrade)
957  getLOs(fout);
958 
959  /*
960  * Collect dependency data to assist in ordering the objects.
961  */
962  getDependencies(fout);
963 
964  /*
965  * Collect ACLs, comments, and security labels, if wanted.
966  */
967  if (!dopt.aclsSkip)
968  getAdditionalACLs(fout);
969  if (!dopt.no_comments)
970  collectComments(fout);
971  if (!dopt.no_security_labels)
972  collectSecLabels(fout);
973 
974  /* Lastly, create dummy objects to represent the section boundaries */
975  boundaryObjs = createBoundaryObjects();
976 
977  /* Get pointers to all the known DumpableObjects */
978  getDumpableObjects(&dobjs, &numObjs);
979 
980  /*
981  * Add dummy dependencies to enforce the dump section ordering.
982  */
983  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
984 
985  /*
986  * Sort the objects into a safe dump order (no forward references).
987  *
988  * We rely on dependency information to help us determine a safe order, so
989  * the initial sort is mostly for cosmetic purposes: we sort by name to
990  * ensure that logically identical schemas will dump identically.
991  */
992  sortDumpableObjectsByTypeName(dobjs, numObjs);
993 
994  sortDumpableObjects(dobjs, numObjs,
995  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
996 
997  /*
998  * Create archive TOC entries for all the objects to be dumped, in a safe
999  * order.
1000  */
1001 
1002  /*
1003  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
1004  */
1005  dumpEncoding(fout);
1006  dumpStdStrings(fout);
1007  dumpSearchPath(fout);
1008 
1009  /* The database items are always next, unless we don't want them at all */
1010  if (dopt.outputCreateDB)
1011  dumpDatabase(fout);
1012 
1013  /* Now the rearrangeable objects. */
1014  for (i = 0; i < numObjs; i++)
1015  dumpDumpableObject(fout, dobjs[i]);
1016 
1017  /*
1018  * Set up options info to ensure we dump what we want.
1019  */
1020  ropt = NewRestoreOptions();
1021  ropt->filename = filename;
1022 
1023  /* if you change this list, see dumpOptionsFromRestoreOptions */
1024  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
1025  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
1026  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
1027  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
1029  ropt->dropSchema = dopt.outputClean;
1030  ropt->dataOnly = dopt.dataOnly;
1031  ropt->schemaOnly = dopt.schemaOnly;
1032  ropt->if_exists = dopt.if_exists;
1033  ropt->column_inserts = dopt.column_inserts;
1034  ropt->dumpSections = dopt.dumpSections;
1035  ropt->aclsSkip = dopt.aclsSkip;
1036  ropt->superuser = dopt.outputSuperuser;
1037  ropt->createDB = dopt.outputCreateDB;
1038  ropt->noOwner = dopt.outputNoOwner;
1039  ropt->noTableAm = dopt.outputNoTableAm;
1040  ropt->noTablespace = dopt.outputNoTablespaces;
1041  ropt->disable_triggers = dopt.disable_triggers;
1042  ropt->use_setsessauth = dopt.use_setsessauth;
1044  ropt->dump_inserts = dopt.dump_inserts;
1045  ropt->no_comments = dopt.no_comments;
1046  ropt->no_publications = dopt.no_publications;
1048  ropt->no_subscriptions = dopt.no_subscriptions;
1049  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
1052  ropt->sequence_data = dopt.sequence_data;
1053  ropt->binary_upgrade = dopt.binary_upgrade;
1054 
1055  ropt->compression_spec = compression_spec;
1056 
1057  ropt->suppressDumpWarnings = true; /* We've already shown them */
1058 
1059  SetArchiveOptions(fout, &dopt, ropt);
1060 
1061  /* Mark which entries should be output */
1063 
1064  /*
1065  * The archive's TOC entries are now marked as to which ones will actually
1066  * be output, so we can set up their dependency lists properly. This isn't
1067  * necessary for plain-text output, though.
1068  */
1069  if (!plainText)
1071 
1072  /*
1073  * And finally we can do the actual output.
1074  *
1075  * Note: for non-plain-text output formats, the output file is written
1076  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1077  * right now.
1078  */
1079  if (plainText)
1080  RestoreArchive(fout);
1081 
1082  CloseArchive(fout);
1083 
1084  exit_nicely(0);
1085 }
1086 
1087 
1088 static void
1089 help(const char *progname)
1090 {
1091  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1092  printf(_("Usage:\n"));
1093  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1094 
1095  printf(_("\nGeneral options:\n"));
1096  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1097  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1098  " plain text (default))\n"));
1099  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1100  printf(_(" -v, --verbose verbose mode\n"));
1101  printf(_(" -V, --version output version information, then exit\n"));
1102  printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
1103  " compress as specified\n"));
1104  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1105  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1106  printf(_(" --sync-method=METHOD set method for syncing files to disk\n"));
1107  printf(_(" -?, --help show this help, then exit\n"));
1108 
1109  printf(_("\nOptions controlling the output content:\n"));
1110  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1111  printf(_(" -b, --large-objects include large objects in dump\n"));
1112  printf(_(" --blobs (same as --large-objects, deprecated)\n"));
1113  printf(_(" -B, --no-large-objects exclude large objects in dump\n"));
1114  printf(_(" --no-blobs (same as --no-large-objects, deprecated)\n"));
1115  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1116  printf(_(" -C, --create include commands to create database in dump\n"));
1117  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1118  printf(_(" --exclude-extension=PATTERN do NOT dump the specified extension(s)\n"));
1119  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1120  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1121  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1122  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1123  " plain-text format\n"));
1124  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1125  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1126  printf(_(" -t, --table=PATTERN dump only the specified table(s)\n"));
1127  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1128  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1129  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1130  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1131  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1132  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1133  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1134  " access to)\n"));
1135  printf(_(" --exclude-table-and-children=PATTERN\n"
1136  " do NOT dump the specified table(s), including\n"
1137  " child and partition tables\n"));
1138  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1139  printf(_(" --exclude-table-data-and-children=PATTERN\n"
1140  " do NOT dump data for the specified table(s),\n"
1141  " including child and partition tables\n"));
1142  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1143  printf(_(" --filter=FILENAME include or exclude objects and data from dump\n"
1144  " based on expressions in FILENAME\n"));
1145  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1146  printf(_(" --include-foreign-data=PATTERN\n"
1147  " include data of foreign tables on foreign\n"
1148  " servers matching PATTERN\n"));
1149  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1150  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1151  printf(_(" --no-comments do not dump comments\n"));
1152  printf(_(" --no-publications do not dump publications\n"));
1153  printf(_(" --no-security-labels do not dump security label assignments\n"));
1154  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1155  printf(_(" --no-table-access-method do not dump table access methods\n"));
1156  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1157  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1158  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1159  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1160  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1161  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1162  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1163  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1164  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1165  printf(_(" --strict-names require table and/or schema include patterns to\n"
1166  " match at least one entity each\n"));
1167  printf(_(" --table-and-children=PATTERN dump only the specified table(s), including\n"
1168  " child and partition tables\n"));
1169  printf(_(" --use-set-session-authorization\n"
1170  " use SET SESSION AUTHORIZATION commands instead of\n"
1171  " ALTER OWNER commands to set ownership\n"));
1172 
1173  printf(_("\nConnection options:\n"));
1174  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1175  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1176  printf(_(" -p, --port=PORT database server port number\n"));
1177  printf(_(" -U, --username=NAME connect as specified database user\n"));
1178  printf(_(" -w, --no-password never prompt for password\n"));
1179  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1180  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1181 
1182  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1183  "variable value is used.\n\n"));
1184  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1185  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1186 }
1187 
1188 static void
1189 setup_connection(Archive *AH, const char *dumpencoding,
1190  const char *dumpsnapshot, char *use_role)
1191 {
1192  DumpOptions *dopt = AH->dopt;
1193  PGconn *conn = GetConnection(AH);
1194  const char *std_strings;
1195 
1197 
1198  /*
1199  * Set the client encoding if requested.
1200  */
1201  if (dumpencoding)
1202  {
1203  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1204  pg_fatal("invalid client encoding \"%s\" specified",
1205  dumpencoding);
1206  }
1207 
1208  /*
1209  * Get the active encoding and the standard_conforming_strings setting, so
1210  * we know how to escape strings.
1211  */
1213 
1214  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1215  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1216 
1217  /*
1218  * Set the role if requested. In a parallel dump worker, we'll be passed
1219  * use_role == NULL, but AH->use_role is already set (if user specified it
1220  * originally) and we should use that.
1221  */
1222  if (!use_role && AH->use_role)
1223  use_role = AH->use_role;
1224 
1225  /* Set the role if requested */
1226  if (use_role)
1227  {
1228  PQExpBuffer query = createPQExpBuffer();
1229 
1230  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1231  ExecuteSqlStatement(AH, query->data);
1232  destroyPQExpBuffer(query);
1233 
1234  /* save it for possible later use by parallel workers */
1235  if (!AH->use_role)
1236  AH->use_role = pg_strdup(use_role);
1237  }
1238 
1239  /* Set the datestyle to ISO to ensure the dump's portability */
1240  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1241 
1242  /* Likewise, avoid using sql_standard intervalstyle */
1243  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1244 
1245  /*
1246  * Use an explicitly specified extra_float_digits if it has been provided.
1247  * Otherwise, set extra_float_digits so that we can dump float data
1248  * exactly (given correctly implemented float I/O code, anyway).
1249  */
1251  {
1253 
1254  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1256  ExecuteSqlStatement(AH, q->data);
1257  destroyPQExpBuffer(q);
1258  }
1259  else
1260  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1261 
1262  /*
1263  * Disable synchronized scanning, to prevent unpredictable changes in row
1264  * ordering across a dump and reload.
1265  */
1266  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1267 
1268  /*
1269  * Disable timeouts if supported.
1270  */
1271  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1272  if (AH->remoteVersion >= 90300)
1273  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1274  if (AH->remoteVersion >= 90600)
1275  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1276  if (AH->remoteVersion >= 170000)
1277  ExecuteSqlStatement(AH, "SET transaction_timeout = 0");
1278 
1279  /*
1280  * Quote all identifiers, if requested.
1281  */
1283  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1284 
1285  /*
1286  * Adjust row-security mode, if supported.
1287  */
1288  if (AH->remoteVersion >= 90500)
1289  {
1290  if (dopt->enable_row_security)
1291  ExecuteSqlStatement(AH, "SET row_security = on");
1292  else
1293  ExecuteSqlStatement(AH, "SET row_security = off");
1294  }
1295 
1296  /*
1297  * Initialize prepared-query state to "nothing prepared". We do this here
1298  * so that a parallel dump worker will have its own state.
1299  */
1300  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1301 
1302  /*
1303  * Start transaction-snapshot mode transaction to dump consistent data.
1304  */
1305  ExecuteSqlStatement(AH, "BEGIN");
1306 
1307  /*
1308  * To support the combination of serializable_deferrable with the jobs
1309  * option we use REPEATABLE READ for the worker connections that are
1310  * passed a snapshot. As long as the snapshot is acquired in a
1311  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1312  * REPEATABLE READ transaction provides the appropriate integrity
1313  * guarantees. This is a kluge, but safe for back-patching.
1314  */
1315  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1317  "SET TRANSACTION ISOLATION LEVEL "
1318  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1319  else
1321  "SET TRANSACTION ISOLATION LEVEL "
1322  "REPEATABLE READ, READ ONLY");
1323 
1324  /*
1325  * If user specified a snapshot to use, select that. In a parallel dump
1326  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1327  * is already set (if the server can handle it) and we should use that.
1328  */
1329  if (dumpsnapshot)
1330  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1331 
1332  if (AH->sync_snapshot_id)
1333  {
1334  PQExpBuffer query = createPQExpBuffer();
1335 
1336  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1338  ExecuteSqlStatement(AH, query->data);
1339  destroyPQExpBuffer(query);
1340  }
1341  else if (AH->numWorkers > 1)
1342  {
1343  if (AH->isStandby && AH->remoteVersion < 100000)
1344  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1346  }
1347 }
1348 
1349 /* Set up connection for a parallel worker process */
1350 static void
1352 {
1353  /*
1354  * We want to re-select all the same values the leader connection is
1355  * using. We'll have inherited directly-usable values in
1356  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1357  * inherited encoding value back to a string to pass to setup_connection.
1358  */
1359  setup_connection(AH,
1361  NULL,
1362  NULL);
1363 }
1364 
1365 static char *
1367 {
1368  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1369  char *result;
1370  PGresult *res;
1371 
1372  res = ExecuteSqlQueryForSingleRow(fout, query);
1373  result = pg_strdup(PQgetvalue(res, 0, 0));
1374  PQclear(res);
1375 
1376  return result;
1377 }
1378 
1379 static ArchiveFormat
1381 {
1382  ArchiveFormat archiveFormat;
1383 
1384  *mode = archModeWrite;
1385 
1386  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1387  {
1388  /* This is used by pg_dumpall, and is not documented */
1389  archiveFormat = archNull;
1390  *mode = archModeAppend;
1391  }
1392  else if (pg_strcasecmp(format, "c") == 0)
1393  archiveFormat = archCustom;
1394  else if (pg_strcasecmp(format, "custom") == 0)
1395  archiveFormat = archCustom;
1396  else if (pg_strcasecmp(format, "d") == 0)
1397  archiveFormat = archDirectory;
1398  else if (pg_strcasecmp(format, "directory") == 0)
1399  archiveFormat = archDirectory;
1400  else if (pg_strcasecmp(format, "p") == 0)
1401  archiveFormat = archNull;
1402  else if (pg_strcasecmp(format, "plain") == 0)
1403  archiveFormat = archNull;
1404  else if (pg_strcasecmp(format, "t") == 0)
1405  archiveFormat = archTar;
1406  else if (pg_strcasecmp(format, "tar") == 0)
1407  archiveFormat = archTar;
1408  else
1409  pg_fatal("invalid output format \"%s\" specified", format);
1410  return archiveFormat;
1411 }
1412 
1413 /*
1414  * Find the OIDs of all schemas matching the given list of patterns,
1415  * and append them to the given OID list.
1416  */
1417 static void
1419  SimpleStringList *patterns,
1420  SimpleOidList *oids,
1421  bool strict_names)
1422 {
1423  PQExpBuffer query;
1424  PGresult *res;
1425  SimpleStringListCell *cell;
1426  int i;
1427 
1428  if (patterns->head == NULL)
1429  return; /* nothing to do */
1430 
1431  query = createPQExpBuffer();
1432 
1433  /*
1434  * The loop below runs multiple SELECTs might sometimes result in
1435  * duplicate entries in the OID list, but we don't care.
1436  */
1437 
1438  for (cell = patterns->head; cell; cell = cell->next)
1439  {
1440  PQExpBufferData dbbuf;
1441  int dotcnt;
1442 
1443  appendPQExpBufferStr(query,
1444  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1445  initPQExpBuffer(&dbbuf);
1446  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1447  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1448  &dotcnt);
1449  if (dotcnt > 1)
1450  pg_fatal("improper qualified name (too many dotted names): %s",
1451  cell->val);
1452  else if (dotcnt == 1)
1453  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1454  termPQExpBuffer(&dbbuf);
1455 
1456  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1457  if (strict_names && PQntuples(res) == 0)
1458  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1459 
1460  for (i = 0; i < PQntuples(res); i++)
1461  {
1463  }
1464 
1465  PQclear(res);
1466  resetPQExpBuffer(query);
1467  }
1468 
1469  destroyPQExpBuffer(query);
1470 }
1471 
1472 /*
1473  * Find the OIDs of all extensions matching the given list of patterns,
1474  * and append them to the given OID list.
1475  */
1476 static void
1478  SimpleStringList *patterns,
1479  SimpleOidList *oids,
1480  bool strict_names)
1481 {
1482  PQExpBuffer query;
1483  PGresult *res;
1484  SimpleStringListCell *cell;
1485  int i;
1486 
1487  if (patterns->head == NULL)
1488  return; /* nothing to do */
1489 
1490  query = createPQExpBuffer();
1491 
1492  /*
1493  * The loop below runs multiple SELECTs might sometimes result in
1494  * duplicate entries in the OID list, but we don't care.
1495  */
1496  for (cell = patterns->head; cell; cell = cell->next)
1497  {
1498  int dotcnt;
1499 
1500  appendPQExpBufferStr(query,
1501  "SELECT oid FROM pg_catalog.pg_extension e\n");
1502  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1503  false, NULL, "e.extname", NULL, NULL, NULL,
1504  &dotcnt);
1505  if (dotcnt > 0)
1506  pg_fatal("improper qualified name (too many dotted names): %s",
1507  cell->val);
1508 
1509  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1510  if (strict_names && PQntuples(res) == 0)
1511  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1512 
1513  for (i = 0; i < PQntuples(res); i++)
1514  {
1516  }
1517 
1518  PQclear(res);
1519  resetPQExpBuffer(query);
1520  }
1521 
1522  destroyPQExpBuffer(query);
1523 }
1524 
1525 /*
1526  * Find the OIDs of all foreign servers matching the given list of patterns,
1527  * and append them to the given OID list.
1528  */
1529 static void
1531  SimpleStringList *patterns,
1532  SimpleOidList *oids)
1533 {
1534  PQExpBuffer query;
1535  PGresult *res;
1536  SimpleStringListCell *cell;
1537  int i;
1538 
1539  if (patterns->head == NULL)
1540  return; /* nothing to do */
1541 
1542  query = createPQExpBuffer();
1543 
1544  /*
1545  * The loop below runs multiple SELECTs might sometimes result in
1546  * duplicate entries in the OID list, but we don't care.
1547  */
1548 
1549  for (cell = patterns->head; cell; cell = cell->next)
1550  {
1551  int dotcnt;
1552 
1553  appendPQExpBufferStr(query,
1554  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1555  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1556  false, NULL, "s.srvname", NULL, NULL, NULL,
1557  &dotcnt);
1558  if (dotcnt > 0)
1559  pg_fatal("improper qualified name (too many dotted names): %s",
1560  cell->val);
1561 
1562  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1563  if (PQntuples(res) == 0)
1564  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1565 
1566  for (i = 0; i < PQntuples(res); i++)
1568 
1569  PQclear(res);
1570  resetPQExpBuffer(query);
1571  }
1572 
1573  destroyPQExpBuffer(query);
1574 }
1575 
1576 /*
1577  * Find the OIDs of all tables matching the given list of patterns,
1578  * and append them to the given OID list. See also expand_dbname_patterns()
1579  * in pg_dumpall.c
1580  */
1581 static void
1583  SimpleStringList *patterns, SimpleOidList *oids,
1584  bool strict_names, bool with_child_tables)
1585 {
1586  PQExpBuffer query;
1587  PGresult *res;
1588  SimpleStringListCell *cell;
1589  int i;
1590 
1591  if (patterns->head == NULL)
1592  return; /* nothing to do */
1593 
1594  query = createPQExpBuffer();
1595 
1596  /*
1597  * this might sometimes result in duplicate entries in the OID list, but
1598  * we don't care.
1599  */
1600 
1601  for (cell = patterns->head; cell; cell = cell->next)
1602  {
1603  PQExpBufferData dbbuf;
1604  int dotcnt;
1605 
1606  /*
1607  * Query must remain ABSOLUTELY devoid of unqualified names. This
1608  * would be unnecessary given a pg_table_is_visible() variant taking a
1609  * search_path argument.
1610  *
1611  * For with_child_tables, we start with the basic query's results and
1612  * recursively search the inheritance tree to add child tables.
1613  */
1614  if (with_child_tables)
1615  {
1616  appendPQExpBuffer(query, "WITH RECURSIVE partition_tree (relid) AS (\n");
1617  }
1618 
1619  appendPQExpBuffer(query,
1620  "SELECT c.oid"
1621  "\nFROM pg_catalog.pg_class c"
1622  "\n LEFT JOIN pg_catalog.pg_namespace n"
1623  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1624  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1625  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1626  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1627  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1628  RELKIND_PARTITIONED_TABLE);
1629  initPQExpBuffer(&dbbuf);
1630  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1631  false, "n.nspname", "c.relname", NULL,
1632  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1633  &dotcnt);
1634  if (dotcnt > 2)
1635  pg_fatal("improper relation name (too many dotted names): %s",
1636  cell->val);
1637  else if (dotcnt == 2)
1638  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1639  termPQExpBuffer(&dbbuf);
1640 
1641  if (with_child_tables)
1642  {
1643  appendPQExpBuffer(query, "UNION"
1644  "\nSELECT i.inhrelid"
1645  "\nFROM partition_tree p"
1646  "\n JOIN pg_catalog.pg_inherits i"
1647  "\n ON p.relid OPERATOR(pg_catalog.=) i.inhparent"
1648  "\n)"
1649  "\nSELECT relid FROM partition_tree");
1650  }
1651 
1652  ExecuteSqlStatement(fout, "RESET search_path");
1653  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1656  if (strict_names && PQntuples(res) == 0)
1657  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1658 
1659  for (i = 0; i < PQntuples(res); i++)
1660  {
1662  }
1663 
1664  PQclear(res);
1665  resetPQExpBuffer(query);
1666  }
1667 
1668  destroyPQExpBuffer(query);
1669 }
1670 
1671 /*
1672  * Verifies that the connected database name matches the given database name,
1673  * and if not, dies with an error about the given pattern.
1674  *
1675  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1676  */
1677 static void
1678 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1679 {
1680  const char *db;
1681 
1682  db = PQdb(conn);
1683  if (db == NULL)
1684  pg_fatal("You are currently not connected to a database.");
1685 
1686  if (strcmp(db, dbname) != 0)
1687  pg_fatal("cross-database references are not implemented: %s",
1688  pattern);
1689 }
1690 
1691 /*
1692  * checkExtensionMembership
1693  * Determine whether object is an extension member, and if so,
1694  * record an appropriate dependency and set the object's dump flag.
1695  *
1696  * It's important to call this for each object that could be an extension
1697  * member. Generally, we integrate this with determining the object's
1698  * to-be-dumped-ness, since extension membership overrides other rules for that.
1699  *
1700  * Returns true if object is an extension member, else false.
1701  */
1702 static bool
1704 {
1705  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1706 
1707  if (ext == NULL)
1708  return false;
1709 
1710  dobj->ext_member = true;
1711 
1712  /* Record dependency so that getDependencies needn't deal with that */
1713  addObjectDependency(dobj, ext->dobj.dumpId);
1714 
1715  /*
1716  * In 9.6 and above, mark the member object to have any non-initial ACLs
1717  * dumped. (Any initial ACLs will be removed later, using data from
1718  * pg_init_privs, so that we'll dump only the delta from the extension's
1719  * initial setup.)
1720  *
1721  * Prior to 9.6, we do not include any extension member components.
1722  *
1723  * In binary upgrades, we still dump all components of the members
1724  * individually, since the idea is to exactly reproduce the database
1725  * contents rather than replace the extension contents with something
1726  * different.
1727  *
1728  * Note: it might be interesting someday to implement storage and delta
1729  * dumping of extension members' RLS policies and/or security labels.
1730  * However there is a pitfall for RLS policies: trying to dump them
1731  * requires getting a lock on their tables, and the calling user might not
1732  * have privileges for that. We need no lock to examine a table's ACLs,
1733  * so the current feature doesn't have a problem of that sort.
1734  */
1735  if (fout->dopt->binary_upgrade)
1736  dobj->dump = ext->dobj.dump;
1737  else
1738  {
1739  if (fout->remoteVersion < 90600)
1740  dobj->dump = DUMP_COMPONENT_NONE;
1741  else
1742  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL);
1743  }
1744 
1745  return true;
1746 }
1747 
1748 /*
1749  * selectDumpableNamespace: policy-setting subroutine
1750  * Mark a namespace as to be dumped or not
1751  */
1752 static void
1754 {
1755  /*
1756  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1757  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1758  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1759  */
1760  nsinfo->create = true;
1761 
1762  /*
1763  * If specific tables are being dumped, do not dump any complete
1764  * namespaces. If specific namespaces are being dumped, dump just those
1765  * namespaces. Otherwise, dump all non-system namespaces.
1766  */
1767  if (table_include_oids.head != NULL)
1768  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1769  else if (schema_include_oids.head != NULL)
1770  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1772  nsinfo->dobj.catId.oid) ?
1774  else if (fout->remoteVersion >= 90600 &&
1775  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1776  {
1777  /*
1778  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1779  * they are interesting (and not the original ACLs which were set at
1780  * initdb time, see pg_init_privs).
1781  */
1782  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1783  }
1784  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1785  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1786  {
1787  /* Other system schemas don't get dumped */
1788  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1789  }
1790  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1791  {
1792  /*
1793  * The public schema is a strange beast that sits in a sort of
1794  * no-mans-land between being a system object and a user object.
1795  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1796  * a comment and an indication of ownership. If the owner is the
1797  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1798  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1799  */
1800  nsinfo->create = false;
1801  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1802  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1803  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1805 
1806  /*
1807  * Also, make like it has a comment even if it doesn't; this is so
1808  * that we'll emit a command to drop the comment, if appropriate.
1809  * (Without this, we'd not call dumpCommentExtended for it.)
1810  */
1812  }
1813  else
1814  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1815 
1816  /*
1817  * In any case, a namespace can be excluded by an exclusion switch
1818  */
1819  if (nsinfo->dobj.dump_contains &&
1821  nsinfo->dobj.catId.oid))
1822  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1823 
1824  /*
1825  * If the schema belongs to an extension, allow extension membership to
1826  * override the dump decision for the schema itself. However, this does
1827  * not change dump_contains, so this won't change what we do with objects
1828  * within the schema. (If they belong to the extension, they'll get
1829  * suppressed by it, otherwise not.)
1830  */
1831  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1832 }
1833 
1834 /*
1835  * selectDumpableTable: policy-setting subroutine
1836  * Mark a table as to be dumped or not
1837  */
1838 static void
1840 {
1841  if (checkExtensionMembership(&tbinfo->dobj, fout))
1842  return; /* extension membership overrides all else */
1843 
1844  /*
1845  * If specific tables are being dumped, dump just those tables; else, dump
1846  * according to the parent namespace's dump flag.
1847  */
1848  if (table_include_oids.head != NULL)
1850  tbinfo->dobj.catId.oid) ?
1852  else
1853  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1854 
1855  /*
1856  * In any case, a table can be excluded by an exclusion switch
1857  */
1858  if (tbinfo->dobj.dump &&
1860  tbinfo->dobj.catId.oid))
1861  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1862 }
1863 
1864 /*
1865  * selectDumpableType: policy-setting subroutine
1866  * Mark a type as to be dumped or not
1867  *
1868  * If it's a table's rowtype or an autogenerated array type, we also apply a
1869  * special type code to facilitate sorting into the desired order. (We don't
1870  * want to consider those to be ordinary types because that would bring tables
1871  * up into the datatype part of the dump order.) We still set the object's
1872  * dump flag; that's not going to cause the dummy type to be dumped, but we
1873  * need it so that casts involving such types will be dumped correctly -- see
1874  * dumpCast. This means the flag should be set the same as for the underlying
1875  * object (the table or base type).
1876  */
1877 static void
1879 {
1880  /* skip complex types, except for standalone composite types */
1881  if (OidIsValid(tyinfo->typrelid) &&
1882  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1883  {
1884  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1885 
1886  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1887  if (tytable != NULL)
1888  tyinfo->dobj.dump = tytable->dobj.dump;
1889  else
1890  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1891  return;
1892  }
1893 
1894  /* skip auto-generated array and multirange types */
1895  if (tyinfo->isArray || tyinfo->isMultirange)
1896  {
1897  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1898 
1899  /*
1900  * Fall through to set the dump flag; we assume that the subsequent
1901  * rules will do the same thing as they would for the array's base
1902  * type or multirange's range type. (We cannot reliably look up the
1903  * base type here, since getTypes may not have processed it yet.)
1904  */
1905  }
1906 
1907  if (checkExtensionMembership(&tyinfo->dobj, fout))
1908  return; /* extension membership overrides all else */
1909 
1910  /* Dump based on if the contents of the namespace are being dumped */
1911  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1912 }
1913 
1914 /*
1915  * selectDumpableDefaultACL: policy-setting subroutine
1916  * Mark a default ACL as to be dumped or not
1917  *
1918  * For per-schema default ACLs, dump if the schema is to be dumped.
1919  * Otherwise dump if we are dumping "everything". Note that dataOnly
1920  * and aclsSkip are checked separately.
1921  */
1922 static void
1924 {
1925  /* Default ACLs can't be extension members */
1926 
1927  if (dinfo->dobj.namespace)
1928  /* default ACLs are considered part of the namespace */
1929  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1930  else
1931  dinfo->dobj.dump = dopt->include_everything ?
1933 }
1934 
1935 /*
1936  * selectDumpableCast: policy-setting subroutine
1937  * Mark a cast as to be dumped or not
1938  *
1939  * Casts do not belong to any particular namespace (since they haven't got
1940  * names), nor do they have identifiable owners. To distinguish user-defined
1941  * casts from built-in ones, we must resort to checking whether the cast's
1942  * OID is in the range reserved for initdb.
1943  */
1944 static void
1946 {
1947  if (checkExtensionMembership(&cast->dobj, fout))
1948  return; /* extension membership overrides all else */
1949 
1950  /*
1951  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1952  * support ACLs currently.
1953  */
1954  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1955  cast->dobj.dump = DUMP_COMPONENT_NONE;
1956  else
1957  cast->dobj.dump = fout->dopt->include_everything ?
1959 }
1960 
1961 /*
1962  * selectDumpableProcLang: policy-setting subroutine
1963  * Mark a procedural language as to be dumped or not
1964  *
1965  * Procedural languages do not belong to any particular namespace. To
1966  * identify built-in languages, we must resort to checking whether the
1967  * language's OID is in the range reserved for initdb.
1968  */
1969 static void
1971 {
1972  if (checkExtensionMembership(&plang->dobj, fout))
1973  return; /* extension membership overrides all else */
1974 
1975  /*
1976  * Only include procedural languages when we are dumping everything.
1977  *
1978  * For from-initdb procedural languages, only include ACLs, as we do for
1979  * the pg_catalog namespace. We need this because procedural languages do
1980  * not live in any namespace.
1981  */
1982  if (!fout->dopt->include_everything)
1983  plang->dobj.dump = DUMP_COMPONENT_NONE;
1984  else
1985  {
1986  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1987  plang->dobj.dump = fout->remoteVersion < 90600 ?
1989  else
1990  plang->dobj.dump = DUMP_COMPONENT_ALL;
1991  }
1992 }
1993 
1994 /*
1995  * selectDumpableAccessMethod: policy-setting subroutine
1996  * Mark an access method as to be dumped or not
1997  *
1998  * Access methods do not belong to any particular namespace. To identify
1999  * built-in access methods, we must resort to checking whether the
2000  * method's OID is in the range reserved for initdb.
2001  */
2002 static void
2004 {
2005  if (checkExtensionMembership(&method->dobj, fout))
2006  return; /* extension membership overrides all else */
2007 
2008  /*
2009  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
2010  * they do not support ACLs currently.
2011  */
2012  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2013  method->dobj.dump = DUMP_COMPONENT_NONE;
2014  else
2015  method->dobj.dump = fout->dopt->include_everything ?
2017 }
2018 
2019 /*
2020  * selectDumpableExtension: policy-setting subroutine
2021  * Mark an extension as to be dumped or not
2022  *
2023  * Built-in extensions should be skipped except for checking ACLs, since we
2024  * assume those will already be installed in the target database. We identify
2025  * such extensions by their having OIDs in the range reserved for initdb.
2026  * We dump all user-added extensions by default. No extensions are dumped
2027  * if include_everything is false (i.e., a --schema or --table switch was
2028  * given), except if --extension specifies a list of extensions to dump.
2029  */
2030 static void
2032 {
2033  /*
2034  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
2035  * change permissions on their member objects, if they wish to, and have
2036  * those changes preserved.
2037  */
2038  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2039  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
2040  else
2041  {
2042  /* check if there is a list of extensions to dump */
2043  if (extension_include_oids.head != NULL)
2044  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2046  extinfo->dobj.catId.oid) ?
2048  else
2049  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2050  dopt->include_everything ?
2052 
2053  /* check that the extension is not explicitly excluded */
2054  if (extinfo->dobj.dump &&
2056  extinfo->dobj.catId.oid))
2057  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_NONE;
2058  }
2059 }
2060 
2061 /*
2062  * selectDumpablePublicationObject: policy-setting subroutine
2063  * Mark a publication object as to be dumped or not
2064  *
2065  * A publication can have schemas and tables which have schemas, but those are
2066  * ignored in decision making, because publications are only dumped when we are
2067  * dumping everything.
2068  */
2069 static void
2071 {
2072  if (checkExtensionMembership(dobj, fout))
2073  return; /* extension membership overrides all else */
2074 
2075  dobj->dump = fout->dopt->include_everything ?
2077 }
2078 
2079 /*
2080  * selectDumpableStatisticsObject: policy-setting subroutine
2081  * Mark an extended statistics object as to be dumped or not
2082  *
2083  * We dump an extended statistics object if the schema it's in and the table
2084  * it's for are being dumped. (This'll need more thought if statistics
2085  * objects ever support cross-table stats.)
2086  */
2087 static void
2089 {
2090  if (checkExtensionMembership(&sobj->dobj, fout))
2091  return; /* extension membership overrides all else */
2092 
2093  sobj->dobj.dump = sobj->dobj.namespace->dobj.dump_contains;
2094  if (sobj->stattable == NULL ||
2096  sobj->dobj.dump = DUMP_COMPONENT_NONE;
2097 }
2098 
2099 /*
2100  * selectDumpableObject: policy-setting subroutine
2101  * Mark a generic dumpable object as to be dumped or not
2102  *
2103  * Use this only for object types without a special-case routine above.
2104  */
2105 static void
2107 {
2108  if (checkExtensionMembership(dobj, fout))
2109  return; /* extension membership overrides all else */
2110 
2111  /*
2112  * Default policy is to dump if parent namespace is dumpable, or for
2113  * non-namespace-associated items, dump if we're dumping "everything".
2114  */
2115  if (dobj->namespace)
2116  dobj->dump = dobj->namespace->dobj.dump_contains;
2117  else
2118  dobj->dump = fout->dopt->include_everything ?
2120 }
2121 
2122 /*
2123  * Dump a table's contents for loading using the COPY command
2124  * - this routine is called by the Archiver when it wants the table
2125  * to be dumped.
2126  */
2127 static int
2128 dumpTableData_copy(Archive *fout, const void *dcontext)
2129 {
2130  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2131  TableInfo *tbinfo = tdinfo->tdtable;
2132  const char *classname = tbinfo->dobj.name;
2134 
2135  /*
2136  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2137  * which uses it already.
2138  */
2139  PQExpBuffer clistBuf = createPQExpBuffer();
2140  PGconn *conn = GetConnection(fout);
2141  PGresult *res;
2142  int ret;
2143  char *copybuf;
2144  const char *column_list;
2145 
2146  pg_log_info("dumping contents of table \"%s.%s\"",
2147  tbinfo->dobj.namespace->dobj.name, classname);
2148 
2149  /*
2150  * Specify the column list explicitly so that we have no possibility of
2151  * retrieving data in the wrong column order. (The default column
2152  * ordering of COPY will not be what we want in certain corner cases
2153  * involving ADD COLUMN and inheritance.)
2154  */
2155  column_list = fmtCopyColumnList(tbinfo, clistBuf);
2156 
2157  /*
2158  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2159  * a filter condition was specified. For other cases a simple COPY
2160  * suffices.
2161  */
2162  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2163  {
2164  appendPQExpBufferStr(q, "COPY (SELECT ");
2165  /* klugery to get rid of parens in column list */
2166  if (strlen(column_list) > 2)
2167  {
2168  appendPQExpBufferStr(q, column_list + 1);
2169  q->data[q->len - 1] = ' ';
2170  }
2171  else
2172  appendPQExpBufferStr(q, "* ");
2173 
2174  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2175  fmtQualifiedDumpable(tbinfo),
2176  tdinfo->filtercond ? tdinfo->filtercond : "");
2177  }
2178  else
2179  {
2180  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2181  fmtQualifiedDumpable(tbinfo),
2182  column_list);
2183  }
2184  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2185  PQclear(res);
2186  destroyPQExpBuffer(clistBuf);
2187 
2188  for (;;)
2189  {
2190  ret = PQgetCopyData(conn, &copybuf, 0);
2191 
2192  if (ret < 0)
2193  break; /* done or error */
2194 
2195  if (copybuf)
2196  {
2197  WriteData(fout, copybuf, ret);
2198  PQfreemem(copybuf);
2199  }
2200 
2201  /* ----------
2202  * THROTTLE:
2203  *
2204  * There was considerable discussion in late July, 2000 regarding
2205  * slowing down pg_dump when backing up large tables. Users with both
2206  * slow & fast (multi-processor) machines experienced performance
2207  * degradation when doing a backup.
2208  *
2209  * Initial attempts based on sleeping for a number of ms for each ms
2210  * of work were deemed too complex, then a simple 'sleep in each loop'
2211  * implementation was suggested. The latter failed because the loop
2212  * was too tight. Finally, the following was implemented:
2213  *
2214  * If throttle is non-zero, then
2215  * See how long since the last sleep.
2216  * Work out how long to sleep (based on ratio).
2217  * If sleep is more than 100ms, then
2218  * sleep
2219  * reset timer
2220  * EndIf
2221  * EndIf
2222  *
2223  * where the throttle value was the number of ms to sleep per ms of
2224  * work. The calculation was done in each loop.
2225  *
2226  * Most of the hard work is done in the backend, and this solution
2227  * still did not work particularly well: on slow machines, the ratio
2228  * was 50:1, and on medium paced machines, 1:1, and on fast
2229  * multi-processor machines, it had little or no effect, for reasons
2230  * that were unclear.
2231  *
2232  * Further discussion ensued, and the proposal was dropped.
2233  *
2234  * For those people who want this feature, it can be implemented using
2235  * gettimeofday in each loop, calculating the time since last sleep,
2236  * multiplying that by the sleep ratio, then if the result is more
2237  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2238  * function to sleep for a subsecond period ie.
2239  *
2240  * select(0, NULL, NULL, NULL, &tvi);
2241  *
2242  * This will return after the interval specified in the structure tvi.
2243  * Finally, call gettimeofday again to save the 'last sleep time'.
2244  * ----------
2245  */
2246  }
2247  archprintf(fout, "\\.\n\n\n");
2248 
2249  if (ret == -2)
2250  {
2251  /* copy data transfer failed */
2252  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2253  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2254  pg_log_error_detail("Command was: %s", q->data);
2255  exit_nicely(1);
2256  }
2257 
2258  /* Check command status and return to normal libpq state */
2259  res = PQgetResult(conn);
2261  {
2262  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2263  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2264  pg_log_error_detail("Command was: %s", q->data);
2265  exit_nicely(1);
2266  }
2267  PQclear(res);
2268 
2269  /* Do this to ensure we've pumped libpq back to idle state */
2270  if (PQgetResult(conn) != NULL)
2271  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2272  classname);
2273 
2274  destroyPQExpBuffer(q);
2275  return 1;
2276 }
2277 
2278 /*
2279  * Dump table data using INSERT commands.
2280  *
2281  * Caution: when we restore from an archive file direct to database, the
2282  * INSERT commands emitted by this function have to be parsed by
2283  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2284  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2285  */
2286 static int
2287 dumpTableData_insert(Archive *fout, const void *dcontext)
2288 {
2289  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2290  TableInfo *tbinfo = tdinfo->tdtable;
2291  DumpOptions *dopt = fout->dopt;
2293  PQExpBuffer insertStmt = NULL;
2294  char *attgenerated;
2295  PGresult *res;
2296  int nfields,
2297  i;
2298  int rows_per_statement = dopt->dump_inserts;
2299  int rows_this_statement = 0;
2300 
2301  /*
2302  * If we're going to emit INSERTs with column names, the most efficient
2303  * way to deal with generated columns is to exclude them entirely. For
2304  * INSERTs without column names, we have to emit DEFAULT rather than the
2305  * actual column value --- but we can save a few cycles by fetching nulls
2306  * rather than the uninteresting-to-us value.
2307  */
2308  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2309  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2310  nfields = 0;
2311  for (i = 0; i < tbinfo->numatts; i++)
2312  {
2313  if (tbinfo->attisdropped[i])
2314  continue;
2315  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2316  continue;
2317  if (nfields > 0)
2318  appendPQExpBufferStr(q, ", ");
2319  if (tbinfo->attgenerated[i])
2320  appendPQExpBufferStr(q, "NULL");
2321  else
2322  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2323  attgenerated[nfields] = tbinfo->attgenerated[i];
2324  nfields++;
2325  }
2326  /* Servers before 9.4 will complain about zero-column SELECT */
2327  if (nfields == 0)
2328  appendPQExpBufferStr(q, "NULL");
2329  appendPQExpBuffer(q, " FROM ONLY %s",
2330  fmtQualifiedDumpable(tbinfo));
2331  if (tdinfo->filtercond)
2332  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2333 
2334  ExecuteSqlStatement(fout, q->data);
2335 
2336  while (1)
2337  {
2338  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2339  PGRES_TUPLES_OK);
2340 
2341  /* cross-check field count, allowing for dummy NULL if any */
2342  if (nfields != PQnfields(res) &&
2343  !(nfields == 0 && PQnfields(res) == 1))
2344  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2345  tbinfo->dobj.name);
2346 
2347  /*
2348  * First time through, we build as much of the INSERT statement as
2349  * possible in "insertStmt", which we can then just print for each
2350  * statement. If the table happens to have zero dumpable columns then
2351  * this will be a complete statement, otherwise it will end in
2352  * "VALUES" and be ready to have the row's column values printed.
2353  */
2354  if (insertStmt == NULL)
2355  {
2356  TableInfo *targettab;
2357 
2358  insertStmt = createPQExpBuffer();
2359 
2360  /*
2361  * When load-via-partition-root is set or forced, get the root
2362  * table name for the partition table, so that we can reload data
2363  * through the root table.
2364  */
2365  if (tbinfo->ispartition &&
2366  (dopt->load_via_partition_root ||
2367  forcePartitionRootLoad(tbinfo)))
2368  targettab = getRootTableInfo(tbinfo);
2369  else
2370  targettab = tbinfo;
2371 
2372  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2373  fmtQualifiedDumpable(targettab));
2374 
2375  /* corner case for zero-column table */
2376  if (nfields == 0)
2377  {
2378  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2379  }
2380  else
2381  {
2382  /* append the list of column names if required */
2383  if (dopt->column_inserts)
2384  {
2385  appendPQExpBufferChar(insertStmt, '(');
2386  for (int field = 0; field < nfields; field++)
2387  {
2388  if (field > 0)
2389  appendPQExpBufferStr(insertStmt, ", ");
2390  appendPQExpBufferStr(insertStmt,
2391  fmtId(PQfname(res, field)));
2392  }
2393  appendPQExpBufferStr(insertStmt, ") ");
2394  }
2395 
2396  if (tbinfo->needs_override)
2397  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2398 
2399  appendPQExpBufferStr(insertStmt, "VALUES");
2400  }
2401  }
2402 
2403  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2404  {
2405  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2406  if (rows_this_statement == 0)
2407  archputs(insertStmt->data, fout);
2408 
2409  /*
2410  * If it is zero-column table then we've already written the
2411  * complete statement, which will mean we've disobeyed
2412  * --rows-per-insert when it's set greater than 1. We do support
2413  * a way to make this multi-row with: SELECT UNION ALL SELECT
2414  * UNION ALL ... but that's non-standard so we should avoid it
2415  * given that using INSERTs is mostly only ever needed for
2416  * cross-database exports.
2417  */
2418  if (nfields == 0)
2419  continue;
2420 
2421  /* Emit a row heading */
2422  if (rows_per_statement == 1)
2423  archputs(" (", fout);
2424  else if (rows_this_statement > 0)
2425  archputs(",\n\t(", fout);
2426  else
2427  archputs("\n\t(", fout);
2428 
2429  for (int field = 0; field < nfields; field++)
2430  {
2431  if (field > 0)
2432  archputs(", ", fout);
2433  if (attgenerated[field])
2434  {
2435  archputs("DEFAULT", fout);
2436  continue;
2437  }
2438  if (PQgetisnull(res, tuple, field))
2439  {
2440  archputs("NULL", fout);
2441  continue;
2442  }
2443 
2444  /* XXX This code is partially duplicated in ruleutils.c */
2445  switch (PQftype(res, field))
2446  {
2447  case INT2OID:
2448  case INT4OID:
2449  case INT8OID:
2450  case OIDOID:
2451  case FLOAT4OID:
2452  case FLOAT8OID:
2453  case NUMERICOID:
2454  {
2455  /*
2456  * These types are printed without quotes unless
2457  * they contain values that aren't accepted by the
2458  * scanner unquoted (e.g., 'NaN'). Note that
2459  * strtod() and friends might accept NaN, so we
2460  * can't use that to test.
2461  *
2462  * In reality we only need to defend against
2463  * infinity and NaN, so we need not get too crazy
2464  * about pattern matching here.
2465  */
2466  const char *s = PQgetvalue(res, tuple, field);
2467 
2468  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2469  archputs(s, fout);
2470  else
2471  archprintf(fout, "'%s'", s);
2472  }
2473  break;
2474 
2475  case BITOID:
2476  case VARBITOID:
2477  archprintf(fout, "B'%s'",
2478  PQgetvalue(res, tuple, field));
2479  break;
2480 
2481  case BOOLOID:
2482  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2483  archputs("true", fout);
2484  else
2485  archputs("false", fout);
2486  break;
2487 
2488  default:
2489  /* All other types are printed as string literals. */
2490  resetPQExpBuffer(q);
2492  PQgetvalue(res, tuple, field),
2493  fout);
2494  archputs(q->data, fout);
2495  break;
2496  }
2497  }
2498 
2499  /* Terminate the row ... */
2500  archputs(")", fout);
2501 
2502  /* ... and the statement, if the target no. of rows is reached */
2503  if (++rows_this_statement >= rows_per_statement)
2504  {
2505  if (dopt->do_nothing)
2506  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2507  else
2508  archputs(";\n", fout);
2509  /* Reset the row counter */
2510  rows_this_statement = 0;
2511  }
2512  }
2513 
2514  if (PQntuples(res) <= 0)
2515  {
2516  PQclear(res);
2517  break;
2518  }
2519  PQclear(res);
2520  }
2521 
2522  /* Terminate any statements that didn't make the row count. */
2523  if (rows_this_statement > 0)
2524  {
2525  if (dopt->do_nothing)
2526  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2527  else
2528  archputs(";\n", fout);
2529  }
2530 
2531  archputs("\n\n", fout);
2532 
2533  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2534 
2535  destroyPQExpBuffer(q);
2536  if (insertStmt != NULL)
2537  destroyPQExpBuffer(insertStmt);
2538  free(attgenerated);
2539 
2540  return 1;
2541 }
2542 
2543 /*
2544  * getRootTableInfo:
2545  * get the root TableInfo for the given partition table.
2546  */
2547 static TableInfo *
2549 {
2550  TableInfo *parentTbinfo;
2551 
2552  Assert(tbinfo->ispartition);
2553  Assert(tbinfo->numParents == 1);
2554 
2555  parentTbinfo = tbinfo->parents[0];
2556  while (parentTbinfo->ispartition)
2557  {
2558  Assert(parentTbinfo->numParents == 1);
2559  parentTbinfo = parentTbinfo->parents[0];
2560  }
2561 
2562  return parentTbinfo;
2563 }
2564 
2565 /*
2566  * forcePartitionRootLoad
2567  * Check if we must force load_via_partition_root for this partition.
2568  *
2569  * This is required if any level of ancestral partitioned table has an
2570  * unsafe partitioning scheme.
2571  */
2572 static bool
2574 {
2575  TableInfo *parentTbinfo;
2576 
2577  Assert(tbinfo->ispartition);
2578  Assert(tbinfo->numParents == 1);
2579 
2580  parentTbinfo = tbinfo->parents[0];
2581  if (parentTbinfo->unsafe_partitions)
2582  return true;
2583  while (parentTbinfo->ispartition)
2584  {
2585  Assert(parentTbinfo->numParents == 1);
2586  parentTbinfo = parentTbinfo->parents[0];
2587  if (parentTbinfo->unsafe_partitions)
2588  return true;
2589  }
2590 
2591  return false;
2592 }
2593 
2594 /*
2595  * dumpTableData -
2596  * dump the contents of a single table
2597  *
2598  * Actually, this just makes an ArchiveEntry for the table contents.
2599  */
2600 static void
2601 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2602 {
2603  DumpOptions *dopt = fout->dopt;
2604  TableInfo *tbinfo = tdinfo->tdtable;
2605  PQExpBuffer copyBuf = createPQExpBuffer();
2606  PQExpBuffer clistBuf = createPQExpBuffer();
2607  DataDumperPtr dumpFn;
2608  char *tdDefn = NULL;
2609  char *copyStmt;
2610  const char *copyFrom;
2611 
2612  /* We had better have loaded per-column details about this table */
2613  Assert(tbinfo->interesting);
2614 
2615  /*
2616  * When load-via-partition-root is set or forced, get the root table name
2617  * for the partition table, so that we can reload data through the root
2618  * table. Then construct a comment to be inserted into the TOC entry's
2619  * defn field, so that such cases can be identified reliably.
2620  */
2621  if (tbinfo->ispartition &&
2622  (dopt->load_via_partition_root ||
2623  forcePartitionRootLoad(tbinfo)))
2624  {
2625  TableInfo *parentTbinfo;
2626 
2627  parentTbinfo = getRootTableInfo(tbinfo);
2628  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2629  printfPQExpBuffer(copyBuf, "-- load via partition root %s",
2630  copyFrom);
2631  tdDefn = pg_strdup(copyBuf->data);
2632  }
2633  else
2634  copyFrom = fmtQualifiedDumpable(tbinfo);
2635 
2636  if (dopt->dump_inserts == 0)
2637  {
2638  /* Dump/restore using COPY */
2639  dumpFn = dumpTableData_copy;
2640  /* must use 2 steps here 'cause fmtId is nonreentrant */
2641  printfPQExpBuffer(copyBuf, "COPY %s ",
2642  copyFrom);
2643  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2644  fmtCopyColumnList(tbinfo, clistBuf));
2645  copyStmt = copyBuf->data;
2646  }
2647  else
2648  {
2649  /* Restore using INSERT */
2650  dumpFn = dumpTableData_insert;
2651  copyStmt = NULL;
2652  }
2653 
2654  /*
2655  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2656  * dependency on its table as "special" and pass it to ArchiveEntry now.
2657  * See comments for BuildArchiveDependencies.
2658  */
2659  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2660  {
2661  TocEntry *te;
2662 
2663  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2664  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2665  .namespace = tbinfo->dobj.namespace->dobj.name,
2666  .owner = tbinfo->rolname,
2667  .description = "TABLE DATA",
2668  .section = SECTION_DATA,
2669  .createStmt = tdDefn,
2670  .copyStmt = copyStmt,
2671  .deps = &(tbinfo->dobj.dumpId),
2672  .nDeps = 1,
2673  .dumpFn = dumpFn,
2674  .dumpArg = tdinfo));
2675 
2676  /*
2677  * Set the TocEntry's dataLength in case we are doing a parallel dump
2678  * and want to order dump jobs by table size. We choose to measure
2679  * dataLength in table pages (including TOAST pages) during dump, so
2680  * no scaling is needed.
2681  *
2682  * However, relpages is declared as "integer" in pg_class, and hence
2683  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2684  * Cast so that we get the right interpretation of table sizes
2685  * exceeding INT_MAX pages.
2686  */
2687  te->dataLength = (BlockNumber) tbinfo->relpages;
2688  te->dataLength += (BlockNumber) tbinfo->toastpages;
2689 
2690  /*
2691  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2692  * and instead we'd better worry about integer overflow. Clamp to
2693  * INT_MAX if the correct result exceeds that.
2694  */
2695  if (sizeof(te->dataLength) == 4 &&
2696  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2697  te->dataLength < 0))
2698  te->dataLength = INT_MAX;
2699  }
2700 
2701  destroyPQExpBuffer(copyBuf);
2702  destroyPQExpBuffer(clistBuf);
2703 }
2704 
2705 /*
2706  * refreshMatViewData -
2707  * load or refresh the contents of a single materialized view
2708  *
2709  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2710  * statement.
2711  */
2712 static void
2714 {
2715  TableInfo *tbinfo = tdinfo->tdtable;
2716  PQExpBuffer q;
2717 
2718  /* If the materialized view is not flagged as populated, skip this. */
2719  if (!tbinfo->relispopulated)
2720  return;
2721 
2722  q = createPQExpBuffer();
2723 
2724  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2725  fmtQualifiedDumpable(tbinfo));
2726 
2727  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2728  ArchiveEntry(fout,
2729  tdinfo->dobj.catId, /* catalog ID */
2730  tdinfo->dobj.dumpId, /* dump ID */
2731  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2732  .namespace = tbinfo->dobj.namespace->dobj.name,
2733  .owner = tbinfo->rolname,
2734  .description = "MATERIALIZED VIEW DATA",
2735  .section = SECTION_POST_DATA,
2736  .createStmt = q->data,
2737  .deps = tdinfo->dobj.dependencies,
2738  .nDeps = tdinfo->dobj.nDeps));
2739 
2740  destroyPQExpBuffer(q);
2741 }
2742 
2743 /*
2744  * getTableData -
2745  * set up dumpable objects representing the contents of tables
2746  */
2747 static void
2748 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2749 {
2750  int i;
2751 
2752  for (i = 0; i < numTables; i++)
2753  {
2754  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2755  (!relkind || tblinfo[i].relkind == relkind))
2756  makeTableDataInfo(dopt, &(tblinfo[i]));
2757  }
2758 }
2759 
2760 /*
2761  * Make a dumpable object for the data of this specific table
2762  *
2763  * Note: we make a TableDataInfo if and only if we are going to dump the
2764  * table data; the "dump" field in such objects isn't very interesting.
2765  */
2766 static void
2768 {
2769  TableDataInfo *tdinfo;
2770 
2771  /*
2772  * Nothing to do if we already decided to dump the table. This will
2773  * happen for "config" tables.
2774  */
2775  if (tbinfo->dataObj != NULL)
2776  return;
2777 
2778  /* Skip VIEWs (no data to dump) */
2779  if (tbinfo->relkind == RELKIND_VIEW)
2780  return;
2781  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2782  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2785  tbinfo->foreign_server)))
2786  return;
2787  /* Skip partitioned tables (data in partitions) */
2788  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2789  return;
2790 
2791  /* Don't dump data in unlogged tables, if so requested */
2792  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2793  dopt->no_unlogged_table_data)
2794  return;
2795 
2796  /* Check that the data is not explicitly excluded */
2798  tbinfo->dobj.catId.oid))
2799  return;
2800 
2801  /* OK, let's dump it */
2802  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2803 
2804  if (tbinfo->relkind == RELKIND_MATVIEW)
2805  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2806  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2807  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2808  else
2809  tdinfo->dobj.objType = DO_TABLE_DATA;
2810 
2811  /*
2812  * Note: use tableoid 0 so that this object won't be mistaken for
2813  * something that pg_depend entries apply to.
2814  */
2815  tdinfo->dobj.catId.tableoid = 0;
2816  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2817  AssignDumpId(&tdinfo->dobj);
2818  tdinfo->dobj.name = tbinfo->dobj.name;
2819  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2820  tdinfo->tdtable = tbinfo;
2821  tdinfo->filtercond = NULL; /* might get set later */
2822  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2823 
2824  /* A TableDataInfo contains data, of course */
2825  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2826 
2827  tbinfo->dataObj = tdinfo;
2828 
2829  /* Make sure that we'll collect per-column info for this table. */
2830  tbinfo->interesting = true;
2831 }
2832 
2833 /*
2834  * The refresh for a materialized view must be dependent on the refresh for
2835  * any materialized view that this one is dependent on.
2836  *
2837  * This must be called after all the objects are created, but before they are
2838  * sorted.
2839  */
2840 static void
2842 {
2843  PQExpBuffer query;
2844  PGresult *res;
2845  int ntups,
2846  i;
2847  int i_classid,
2848  i_objid,
2849  i_refobjid;
2850 
2851  /* No Mat Views before 9.3. */
2852  if (fout->remoteVersion < 90300)
2853  return;
2854 
2855  query = createPQExpBuffer();
2856 
2857  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2858  "( "
2859  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2860  "FROM pg_depend d1 "
2861  "JOIN pg_class c1 ON c1.oid = d1.objid "
2862  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2863  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2864  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2865  "AND d2.objid = r1.oid "
2866  "AND d2.refobjid <> d1.objid "
2867  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2868  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2869  CppAsString2(RELKIND_VIEW) ") "
2870  "WHERE d1.classid = 'pg_class'::regclass "
2871  "UNION "
2872  "SELECT w.objid, d3.refobjid, c3.relkind "
2873  "FROM w "
2874  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2875  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2876  "AND d3.objid = r3.oid "
2877  "AND d3.refobjid <> w.refobjid "
2878  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2879  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2880  CppAsString2(RELKIND_VIEW) ") "
2881  ") "
2882  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2883  "FROM w "
2884  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2885 
2886  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2887 
2888  ntups = PQntuples(res);
2889 
2890  i_classid = PQfnumber(res, "classid");
2891  i_objid = PQfnumber(res, "objid");
2892  i_refobjid = PQfnumber(res, "refobjid");
2893 
2894  for (i = 0; i < ntups; i++)
2895  {
2896  CatalogId objId;
2897  CatalogId refobjId;
2898  DumpableObject *dobj;
2899  DumpableObject *refdobj;
2900  TableInfo *tbinfo;
2901  TableInfo *reftbinfo;
2902 
2903  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2904  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2905  refobjId.tableoid = objId.tableoid;
2906  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2907 
2908  dobj = findObjectByCatalogId(objId);
2909  if (dobj == NULL)
2910  continue;
2911 
2912  Assert(dobj->objType == DO_TABLE);
2913  tbinfo = (TableInfo *) dobj;
2914  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2915  dobj = (DumpableObject *) tbinfo->dataObj;
2916  if (dobj == NULL)
2917  continue;
2918  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2919 
2920  refdobj = findObjectByCatalogId(refobjId);
2921  if (refdobj == NULL)
2922  continue;
2923 
2924  Assert(refdobj->objType == DO_TABLE);
2925  reftbinfo = (TableInfo *) refdobj;
2926  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2927  refdobj = (DumpableObject *) reftbinfo->dataObj;
2928  if (refdobj == NULL)
2929  continue;
2930  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2931 
2932  addObjectDependency(dobj, refdobj->dumpId);
2933 
2934  if (!reftbinfo->relispopulated)
2935  tbinfo->relispopulated = false;
2936  }
2937 
2938  PQclear(res);
2939 
2940  destroyPQExpBuffer(query);
2941 }
2942 
2943 /*
2944  * getTableDataFKConstraints -
2945  * add dump-order dependencies reflecting foreign key constraints
2946  *
2947  * This code is executed only in a data-only dump --- in schema+data dumps
2948  * we handle foreign key issues by not creating the FK constraints until
2949  * after the data is loaded. In a data-only dump, however, we want to
2950  * order the table data objects in such a way that a table's referenced
2951  * tables are restored first. (In the presence of circular references or
2952  * self-references this may be impossible; we'll detect and complain about
2953  * that during the dependency sorting step.)
2954  */
2955 static void
2957 {
2958  DumpableObject **dobjs;
2959  int numObjs;
2960  int i;
2961 
2962  /* Search through all the dumpable objects for FK constraints */
2963  getDumpableObjects(&dobjs, &numObjs);
2964  for (i = 0; i < numObjs; i++)
2965  {
2966  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2967  {
2968  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2969  TableInfo *ftable;
2970 
2971  /* Not interesting unless both tables are to be dumped */
2972  if (cinfo->contable == NULL ||
2973  cinfo->contable->dataObj == NULL)
2974  continue;
2975  ftable = findTableByOid(cinfo->confrelid);
2976  if (ftable == NULL ||
2977  ftable->dataObj == NULL)
2978  continue;
2979 
2980  /*
2981  * Okay, make referencing table's TABLE_DATA object depend on the
2982  * referenced table's TABLE_DATA object.
2983  */
2985  ftable->dataObj->dobj.dumpId);
2986  }
2987  }
2988  free(dobjs);
2989 }
2990 
2991 
2992 /*
2993  * dumpDatabase:
2994  * dump the database definition
2995  */
2996 static void
2998 {
2999  DumpOptions *dopt = fout->dopt;
3000  PQExpBuffer dbQry = createPQExpBuffer();
3001  PQExpBuffer delQry = createPQExpBuffer();
3002  PQExpBuffer creaQry = createPQExpBuffer();
3003  PQExpBuffer labelq = createPQExpBuffer();
3004  PGconn *conn = GetConnection(fout);
3005  PGresult *res;
3006  int i_tableoid,
3007  i_oid,
3008  i_datname,
3009  i_datdba,
3010  i_encoding,
3011  i_datlocprovider,
3012  i_collate,
3013  i_ctype,
3014  i_datlocale,
3015  i_daticurules,
3016  i_frozenxid,
3017  i_minmxid,
3018  i_datacl,
3019  i_acldefault,
3020  i_datistemplate,
3021  i_datconnlimit,
3022  i_datcollversion,
3023  i_tablespace;
3024  CatalogId dbCatId;
3025  DumpId dbDumpId;
3026  DumpableAcl dbdacl;
3027  const char *datname,
3028  *dba,
3029  *encoding,
3030  *datlocprovider,
3031  *collate,
3032  *ctype,
3033  *locale,
3034  *icurules,
3035  *datistemplate,
3036  *datconnlimit,
3037  *tablespace;
3038  uint32 frozenxid,
3039  minmxid;
3040  char *qdatname;
3041 
3042  pg_log_info("saving database definition");
3043 
3044  /*
3045  * Fetch the database-level properties for this database.
3046  */
3047  appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
3048  "datdba, "
3049  "pg_encoding_to_char(encoding) AS encoding, "
3050  "datcollate, datctype, datfrozenxid, "
3051  "datacl, acldefault('d', datdba) AS acldefault, "
3052  "datistemplate, datconnlimit, ");
3053  if (fout->remoteVersion >= 90300)
3054  appendPQExpBufferStr(dbQry, "datminmxid, ");
3055  else
3056  appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
3057  if (fout->remoteVersion >= 170000)
3058  appendPQExpBufferStr(dbQry, "datlocprovider, datlocale, datcollversion, ");
3059  else if (fout->remoteVersion >= 150000)
3060  appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale AS datlocale, datcollversion, ");
3061  else
3062  appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS datlocale, NULL AS datcollversion, ");
3063  if (fout->remoteVersion >= 160000)
3064  appendPQExpBufferStr(dbQry, "daticurules, ");
3065  else
3066  appendPQExpBufferStr(dbQry, "NULL AS daticurules, ");
3067  appendPQExpBufferStr(dbQry,
3068  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
3069  "shobj_description(oid, 'pg_database') AS description "
3070  "FROM pg_database "
3071  "WHERE datname = current_database()");
3072 
3073  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
3074 
3075  i_tableoid = PQfnumber(res, "tableoid");
3076  i_oid = PQfnumber(res, "oid");
3077  i_datname = PQfnumber(res, "datname");
3078  i_datdba = PQfnumber(res, "datdba");
3079  i_encoding = PQfnumber(res, "encoding");
3080  i_datlocprovider = PQfnumber(res, "datlocprovider");
3081  i_collate = PQfnumber(res, "datcollate");
3082  i_ctype = PQfnumber(res, "datctype");
3083  i_datlocale = PQfnumber(res, "datlocale");
3084  i_daticurules = PQfnumber(res, "daticurules");
3085  i_frozenxid = PQfnumber(res, "datfrozenxid");
3086  i_minmxid = PQfnumber(res, "datminmxid");
3087  i_datacl = PQfnumber(res, "datacl");
3088  i_acldefault = PQfnumber(res, "acldefault");
3089  i_datistemplate = PQfnumber(res, "datistemplate");
3090  i_datconnlimit = PQfnumber(res, "datconnlimit");
3091  i_datcollversion = PQfnumber(res, "datcollversion");
3092  i_tablespace = PQfnumber(res, "tablespace");
3093 
3094  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
3095  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3096  datname = PQgetvalue(res, 0, i_datname);
3097  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
3098  encoding = PQgetvalue(res, 0, i_encoding);
3099  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
3100  collate = PQgetvalue(res, 0, i_collate);
3101  ctype = PQgetvalue(res, 0, i_ctype);
3102  if (!PQgetisnull(res, 0, i_datlocale))
3103  locale = PQgetvalue(res, 0, i_datlocale);
3104  else
3105  locale = NULL;
3106  if (!PQgetisnull(res, 0, i_daticurules))
3107  icurules = PQgetvalue(res, 0, i_daticurules);
3108  else
3109  icurules = NULL;
3110  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3111  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3112  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3113  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3114  datistemplate = PQgetvalue(res, 0, i_datistemplate);
3115  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3116  tablespace = PQgetvalue(res, 0, i_tablespace);
3117 
3118  qdatname = pg_strdup(fmtId(datname));
3119 
3120  /*
3121  * Prepare the CREATE DATABASE command. We must specify OID (if we want
3122  * to preserve that), as well as the encoding, locale, and tablespace
3123  * since those can't be altered later. Other DB properties are left to
3124  * the DATABASE PROPERTIES entry, so that they can be applied after
3125  * reconnecting to the target DB.
3126  */
3127  if (dopt->binary_upgrade)
3128  {
3129  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0 OID = %u",
3130  qdatname, dbCatId.oid);
3131  }
3132  else
3133  {
3134  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3135  qdatname);
3136  }
3137  if (strlen(encoding) > 0)
3138  {
3139  appendPQExpBufferStr(creaQry, " ENCODING = ");
3140  appendStringLiteralAH(creaQry, encoding, fout);
3141  }
3142 
3143  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
3144  if (datlocprovider[0] == 'b')
3145  appendPQExpBufferStr(creaQry, "builtin");
3146  else if (datlocprovider[0] == 'c')
3147  appendPQExpBufferStr(creaQry, "libc");
3148  else if (datlocprovider[0] == 'i')
3149  appendPQExpBufferStr(creaQry, "icu");
3150  else
3151  pg_fatal("unrecognized locale provider: %s",
3152  datlocprovider);
3153 
3154  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3155  {
3156  appendPQExpBufferStr(creaQry, " LOCALE = ");
3157  appendStringLiteralAH(creaQry, collate, fout);
3158  }
3159  else
3160  {
3161  if (strlen(collate) > 0)
3162  {
3163  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3164  appendStringLiteralAH(creaQry, collate, fout);
3165  }
3166  if (strlen(ctype) > 0)
3167  {
3168  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3169  appendStringLiteralAH(creaQry, ctype, fout);
3170  }
3171  }
3172  if (locale)
3173  {
3174  if (datlocprovider[0] == 'b')
3175  appendPQExpBufferStr(creaQry, " BUILTIN_LOCALE = ");
3176  else
3177  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
3178 
3179  appendStringLiteralAH(creaQry, locale, fout);
3180  }
3181 
3182  if (icurules)
3183  {
3184  appendPQExpBufferStr(creaQry, " ICU_RULES = ");
3185  appendStringLiteralAH(creaQry, icurules, fout);
3186  }
3187 
3188  /*
3189  * For binary upgrade, carry over the collation version. For normal
3190  * dump/restore, omit the version, so that it is computed upon restore.
3191  */
3192  if (dopt->binary_upgrade)
3193  {
3194  if (!PQgetisnull(res, 0, i_datcollversion))
3195  {
3196  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3197  appendStringLiteralAH(creaQry,
3198  PQgetvalue(res, 0, i_datcollversion),
3199  fout);
3200  }
3201  }
3202 
3203  /*
3204  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3205  * thing; the decision whether to specify a tablespace should be left till
3206  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3207  * label the DATABASE entry with the tablespace and let the normal
3208  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3209  * attention to default_tablespace, so that won't work.
3210  */
3211  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3212  !dopt->outputNoTablespaces)
3213  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3214  fmtId(tablespace));
3215  appendPQExpBufferStr(creaQry, ";\n");
3216 
3217  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3218  qdatname);
3219 
3220  dbDumpId = createDumpId();
3221 
3222  ArchiveEntry(fout,
3223  dbCatId, /* catalog ID */
3224  dbDumpId, /* dump ID */
3225  ARCHIVE_OPTS(.tag = datname,
3226  .owner = dba,
3227  .description = "DATABASE",
3228  .section = SECTION_PRE_DATA,
3229  .createStmt = creaQry->data,
3230  .dropStmt = delQry->data));
3231 
3232  /* Compute correct tag for archive entry */
3233  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3234 
3235  /* Dump DB comment if any */
3236  {
3237  /*
3238  * 8.2 and up keep comments on shared objects in a shared table, so we
3239  * cannot use the dumpComment() code used for other database objects.
3240  * Be careful that the ArchiveEntry parameters match that function.
3241  */
3242  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3243 
3244  if (comment && *comment && !dopt->no_comments)
3245  {
3246  resetPQExpBuffer(dbQry);
3247 
3248  /*
3249  * Generates warning when loaded into a differently-named
3250  * database.
3251  */
3252  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3253  appendStringLiteralAH(dbQry, comment, fout);
3254  appendPQExpBufferStr(dbQry, ";\n");
3255 
3257  ARCHIVE_OPTS(.tag = labelq->data,
3258  .owner = dba,
3259  .description = "COMMENT",
3260  .section = SECTION_NONE,
3261  .createStmt = dbQry->data,
3262  .deps = &dbDumpId,
3263  .nDeps = 1));
3264  }
3265  }
3266 
3267  /* Dump DB security label, if enabled */
3268  if (!dopt->no_security_labels)
3269  {
3270  PGresult *shres;
3271  PQExpBuffer seclabelQry;
3272 
3273  seclabelQry = createPQExpBuffer();
3274 
3275  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3276  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3277  resetPQExpBuffer(seclabelQry);
3278  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3279  if (seclabelQry->len > 0)
3281  ARCHIVE_OPTS(.tag = labelq->data,
3282  .owner = dba,
3283  .description = "SECURITY LABEL",
3284  .section = SECTION_NONE,
3285  .createStmt = seclabelQry->data,
3286  .deps = &dbDumpId,
3287  .nDeps = 1));
3288  destroyPQExpBuffer(seclabelQry);
3289  PQclear(shres);
3290  }
3291 
3292  /*
3293  * Dump ACL if any. Note that we do not support initial privileges
3294  * (pg_init_privs) on databases.
3295  */
3296  dbdacl.privtype = 0;
3297  dbdacl.initprivs = NULL;
3298 
3299  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3300  qdatname, NULL, NULL,
3301  NULL, dba, &dbdacl);
3302 
3303  /*
3304  * Now construct a DATABASE PROPERTIES archive entry to restore any
3305  * non-default database-level properties. (The reason this must be
3306  * separate is that we cannot put any additional commands into the TOC
3307  * entry that has CREATE DATABASE. pg_restore would execute such a group
3308  * in an implicit transaction block, and the backend won't allow CREATE
3309  * DATABASE in that context.)
3310  */
3311  resetPQExpBuffer(creaQry);
3312  resetPQExpBuffer(delQry);
3313 
3314  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3315  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3316  qdatname, datconnlimit);
3317 
3318  if (strcmp(datistemplate, "t") == 0)
3319  {
3320  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3321  qdatname);
3322 
3323  /*
3324  * The backend won't accept DROP DATABASE on a template database. We
3325  * can deal with that by removing the template marking before the DROP
3326  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3327  * since no such command is currently supported, fake it with a direct
3328  * UPDATE on pg_database.
3329  */
3330  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3331  "SET datistemplate = false WHERE datname = ");
3332  appendStringLiteralAH(delQry, datname, fout);
3333  appendPQExpBufferStr(delQry, ";\n");
3334  }
3335 
3336  /*
3337  * We do not restore pg_database.dathasloginevt because it is set
3338  * automatically on login event trigger creation.
3339  */
3340 
3341  /* Add database-specific SET options */
3342  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3343 
3344  /*
3345  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3346  * entry, too, for lack of a better place.
3347  */
3348  if (dopt->binary_upgrade)
3349  {
3350  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3351  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3352  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3353  "WHERE datname = ",
3354  frozenxid, minmxid);
3355  appendStringLiteralAH(creaQry, datname, fout);
3356  appendPQExpBufferStr(creaQry, ";\n");
3357  }
3358 
3359  if (creaQry->len > 0)
3361  ARCHIVE_OPTS(.tag = datname,
3362  .owner = dba,
3363  .description = "DATABASE PROPERTIES",
3364  .section = SECTION_PRE_DATA,
3365  .createStmt = creaQry->data,
3366  .dropStmt = delQry->data,
3367  .deps = &dbDumpId));
3368 
3369  /*
3370  * pg_largeobject comes from the old system intact, so set its
3371  * relfrozenxids, relminmxids and relfilenode.
3372  */
3373  if (dopt->binary_upgrade)
3374  {
3375  PGresult *lo_res;
3376  PQExpBuffer loFrozenQry = createPQExpBuffer();
3377  PQExpBuffer loOutQry = createPQExpBuffer();
3378  PQExpBuffer loHorizonQry = createPQExpBuffer();
3379  int ii_relfrozenxid,
3380  ii_relfilenode,
3381  ii_oid,
3382  ii_relminmxid;
3383 
3384  /*
3385  * pg_largeobject
3386  */
3387  if (fout->remoteVersion >= 90300)
3388  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3389  "FROM pg_catalog.pg_class\n"
3390  "WHERE oid IN (%u, %u);\n",
3391  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3392  else
3393  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3394  "FROM pg_catalog.pg_class\n"
3395  "WHERE oid IN (%u, %u);\n",
3396  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3397 
3398  lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3399 
3400  ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3401  ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3402  ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3403  ii_oid = PQfnumber(lo_res, "oid");
3404 
3405  appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3406  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3407  for (int i = 0; i < PQntuples(lo_res); ++i)
3408  {
3409  Oid oid;
3410  RelFileNumber relfilenumber;
3411 
3412  appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3413  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3414  "WHERE oid = %u;\n",
3415  atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3416  atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3417  atooid(PQgetvalue(lo_res, i, ii_oid)));
3418 
3419  oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3420  relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3421 
3422  if (oid == LargeObjectRelationId)
3423  appendPQExpBuffer(loOutQry,
3424  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3425  relfilenumber);
3426  else if (oid == LargeObjectLOidPNIndexId)
3427  appendPQExpBuffer(loOutQry,
3428  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3429  relfilenumber);
3430  }
3431 
3432  appendPQExpBufferStr(loOutQry,
3433  "TRUNCATE pg_catalog.pg_largeobject;\n");
3434  appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3435 
3437  ARCHIVE_OPTS(.tag = "pg_largeobject",
3438  .description = "pg_largeobject",
3439  .section = SECTION_PRE_DATA,
3440  .createStmt = loOutQry->data));
3441 
3442  PQclear(lo_res);
3443 
3444  destroyPQExpBuffer(loFrozenQry);
3445  destroyPQExpBuffer(loHorizonQry);
3446  destroyPQExpBuffer(loOutQry);
3447  }
3448 
3449  PQclear(res);
3450 
3451  free(qdatname);
3452  destroyPQExpBuffer(dbQry);
3453  destroyPQExpBuffer(delQry);
3454  destroyPQExpBuffer(creaQry);
3455  destroyPQExpBuffer(labelq);
3456 }
3457 
3458 /*
3459  * Collect any database-specific or role-and-database-specific SET options
3460  * for this database, and append them to outbuf.
3461  */
3462 static void
3464  const char *dbname, Oid dboid)
3465 {
3466  PGconn *conn = GetConnection(AH);
3468  PGresult *res;
3469 
3470  /* First collect database-specific options */
3471  printfPQExpBuffer(buf, "SELECT unnest(setconfig) FROM pg_db_role_setting "
3472  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3473  dboid);
3474 
3475  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3476 
3477  for (int i = 0; i < PQntuples(res); i++)
3479  "DATABASE", dbname, NULL, NULL,
3480  outbuf);
3481 
3482  PQclear(res);
3483 
3484  /* Now look for role-and-database-specific options */
3485  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3486  "FROM pg_db_role_setting s, pg_roles r "
3487  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3488  dboid);
3489 
3490  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3491 
3492  for (int i = 0; i < PQntuples(res); i++)
3494  "ROLE", PQgetvalue(res, i, 0),
3495  "DATABASE", dbname,
3496  outbuf);
3497 
3498  PQclear(res);
3499 
3501 }
3502 
3503 /*
3504  * dumpEncoding: put the correct encoding into the archive
3505  */
3506 static void
3508 {
3509  const char *encname = pg_encoding_to_char(AH->encoding);
3511 
3512  pg_log_info("saving encoding = %s", encname);
3513 
3514  appendPQExpBufferStr(qry, "SET client_encoding = ");
3515  appendStringLiteralAH(qry, encname, AH);
3516  appendPQExpBufferStr(qry, ";\n");
3517 
3519  ARCHIVE_OPTS(.tag = "ENCODING",
3520  .description = "ENCODING",
3521  .section = SECTION_PRE_DATA,
3522  .createStmt = qry->data));
3523 
3524  destroyPQExpBuffer(qry);
3525 }
3526 
3527 
3528 /*
3529  * dumpStdStrings: put the correct escape string behavior into the archive
3530  */
3531 static void
3533 {
3534  const char *stdstrings = AH->std_strings ? "on" : "off";
3536 
3537  pg_log_info("saving standard_conforming_strings = %s",
3538  stdstrings);
3539 
3540  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3541  stdstrings);
3542 
3544  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3545  .description = "STDSTRINGS",
3546  .section = SECTION_PRE_DATA,
3547  .createStmt = qry->data));
3548 
3549  destroyPQExpBuffer(qry);
3550 }
3551 
3552 /*
3553  * dumpSearchPath: record the active search_path in the archive
3554  */
3555 static void
3557 {
3559  PQExpBuffer path = createPQExpBuffer();
3560  PGresult *res;
3561  char **schemanames = NULL;
3562  int nschemanames = 0;
3563  int i;
3564 
3565  /*
3566  * We use the result of current_schemas(), not the search_path GUC,
3567  * because that might contain wildcards such as "$user", which won't
3568  * necessarily have the same value during restore. Also, this way avoids
3569  * listing schemas that may appear in search_path but not actually exist,
3570  * which seems like a prudent exclusion.
3571  */
3573  "SELECT pg_catalog.current_schemas(false)");
3574 
3575  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3576  pg_fatal("could not parse result of current_schemas()");
3577 
3578  /*
3579  * We use set_config(), not a simple "SET search_path" command, because
3580  * the latter has less-clean behavior if the search path is empty. While
3581  * that's likely to get fixed at some point, it seems like a good idea to
3582  * be as backwards-compatible as possible in what we put into archives.
3583  */
3584  for (i = 0; i < nschemanames; i++)
3585  {
3586  if (i > 0)
3587  appendPQExpBufferStr(path, ", ");
3588  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3589  }
3590 
3591  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3592  appendStringLiteralAH(qry, path->data, AH);
3593  appendPQExpBufferStr(qry, ", false);\n");
3594 
3595  pg_log_info("saving search_path = %s", path->data);
3596 
3598  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3599  .description = "SEARCHPATH",
3600  .section = SECTION_PRE_DATA,
3601  .createStmt = qry->data));
3602 
3603  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3604  AH->searchpath = pg_strdup(qry->data);
3605 
3606  free(schemanames);
3607  PQclear(res);
3608  destroyPQExpBuffer(qry);
3609  destroyPQExpBuffer(path);
3610 }
3611 
3612 
3613 /*
3614  * getLOs:
3615  * Collect schema-level data about large objects
3616  */
3617 static void
3619 {
3620  DumpOptions *dopt = fout->dopt;
3621  PQExpBuffer loQry = createPQExpBuffer();
3622  PGresult *res;
3623  int ntups;
3624  int i;
3625  int n;
3626  int i_oid;
3627  int i_lomowner;
3628  int i_lomacl;
3629  int i_acldefault;
3630 
3631  pg_log_info("reading large objects");
3632 
3633  /*
3634  * Fetch LO OIDs and owner/ACL data. Order the data so that all the blobs
3635  * with the same owner/ACL appear together.
3636  */
3637  appendPQExpBufferStr(loQry,
3638  "SELECT oid, lomowner, lomacl, "
3639  "acldefault('L', lomowner) AS acldefault "
3640  "FROM pg_largeobject_metadata "
3641  "ORDER BY lomowner, lomacl::pg_catalog.text, oid");
3642 
3643  res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3644 
3645  i_oid = PQfnumber(res, "oid");
3646  i_lomowner = PQfnumber(res, "lomowner");
3647  i_lomacl = PQfnumber(res, "lomacl");
3648  i_acldefault = PQfnumber(res, "acldefault");
3649 
3650  ntups = PQntuples(res);
3651 
3652  /*
3653  * Group the blobs into suitably-sized groups that have the same owner and
3654  * ACL setting, and build a metadata and a data DumpableObject for each
3655  * group. (If we supported initprivs for blobs, we'd have to insist that
3656  * groups also share initprivs settings, since the DumpableObject only has
3657  * room for one.) i is the index of the first tuple in the current group,
3658  * and n is the number of tuples we include in the group.
3659  */
3660  for (i = 0; i < ntups; i += n)
3661  {
3662  Oid thisoid = atooid(PQgetvalue(res, i, i_oid));
3663  char *thisowner = PQgetvalue(res, i, i_lomowner);
3664  char *thisacl = PQgetvalue(res, i, i_lomacl);
3665  LoInfo *loinfo;
3666  DumpableObject *lodata;
3667  char namebuf[64];
3668 
3669  /* Scan to find first tuple not to be included in group */
3670  n = 1;
3671  while (n < MAX_BLOBS_PER_ARCHIVE_ENTRY && i + n < ntups)
3672  {
3673  if (strcmp(thisowner, PQgetvalue(res, i + n, i_lomowner)) != 0 ||
3674  strcmp(thisacl, PQgetvalue(res, i + n, i_lomacl)) != 0)
3675  break;
3676  n++;
3677  }
3678 
3679  /* Build the metadata DumpableObject */
3680  loinfo = (LoInfo *) pg_malloc(offsetof(LoInfo, looids) + n * sizeof(Oid));
3681 
3682  loinfo->dobj.objType = DO_LARGE_OBJECT;
3683  loinfo->dobj.catId.tableoid = LargeObjectRelationId;
3684  loinfo->dobj.catId.oid = thisoid;
3685  AssignDumpId(&loinfo->dobj);
3686 
3687  if (n > 1)
3688  snprintf(namebuf, sizeof(namebuf), "%u..%u", thisoid,
3689  atooid(PQgetvalue(res, i + n - 1, i_oid)));
3690  else
3691  snprintf(namebuf, sizeof(namebuf), "%u", thisoid);
3692  loinfo->dobj.name = pg_strdup(namebuf);
3693  loinfo->dacl.acl = pg_strdup(thisacl);
3694  loinfo->dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3695  loinfo->dacl.privtype = 0;
3696  loinfo->dacl.initprivs = NULL;
3697  loinfo->rolname = getRoleName(thisowner);
3698  loinfo->numlos = n;
3699  loinfo->looids[0] = thisoid;
3700  /* Collect OIDs of the remaining blobs in this group */
3701  for (int k = 1; k < n; k++)
3702  {
3703  CatalogId extraID;
3704 
3705  loinfo->looids[k] = atooid(PQgetvalue(res, i + k, i_oid));
3706 
3707  /* Make sure we can look up loinfo by any of the blobs' OIDs */
3708  extraID.tableoid = LargeObjectRelationId;
3709  extraID.oid = loinfo->looids[k];
3710  recordAdditionalCatalogID(extraID, &loinfo->dobj);
3711  }
3712 
3713  /* LOs have data */
3714  loinfo->dobj.components |= DUMP_COMPONENT_DATA;
3715 
3716  /* Mark whether LO group has a non-empty ACL */
3717  if (!PQgetisnull(res, i, i_lomacl))
3718  loinfo->dobj.components |= DUMP_COMPONENT_ACL;
3719 
3720  /*
3721  * In binary-upgrade mode for LOs, we do *not* dump out the LO data,
3722  * as it will be copied by pg_upgrade, which simply copies the
3723  * pg_largeobject table. We *do* however dump out anything but the
3724  * data, as pg_upgrade copies just pg_largeobject, but not
3725  * pg_largeobject_metadata, after the dump is restored.
3726  */
3727  if (dopt->binary_upgrade)
3728  loinfo->dobj.dump &= ~DUMP_COMPONENT_DATA;
3729 
3730  /*
3731  * Create a "BLOBS" data item for the group, too. This is just a
3732  * placeholder for sorting; it carries no data now.
3733  */
3734  lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3735  lodata->objType = DO_LARGE_OBJECT_DATA;
3736  lodata->catId = nilCatalogId;
3737  AssignDumpId(lodata);
3738  lodata->name = pg_strdup(namebuf);
3739  lodata->components |= DUMP_COMPONENT_DATA;
3740  /* Set up explicit dependency from data to metadata */
3741  lodata->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
3742  lodata->dependencies[0] = loinfo->dobj.dumpId;
3743  lodata->nDeps = lodata->allocDeps = 1;
3744  }
3745 
3746  PQclear(res);
3747  destroyPQExpBuffer(loQry);
3748 }
3749 
3750 /*
3751  * dumpLO
3752  *
3753  * dump the definition (metadata) of the given large object group
3754  */
3755 static void
3756 dumpLO(Archive *fout, const LoInfo *loinfo)
3757 {
3758  PQExpBuffer cquery = createPQExpBuffer();
3759 
3760  /*
3761  * The "definition" is just a newline-separated list of OIDs. We need to
3762  * put something into the dropStmt too, but it can just be a comment.
3763  */
3764  for (int i = 0; i < loinfo->numlos; i++)
3765  appendPQExpBuffer(cquery, "%u\n", loinfo->looids[i]);
3766 
3767  if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3768  ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3769  ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3770  .owner = loinfo->rolname,
3771  .description = "BLOB METADATA",
3772  .section = SECTION_DATA,
3773  .createStmt = cquery->data,
3774  .dropStmt = "-- dummy"));
3775 
3776  /*
3777  * Dump per-blob comments and seclabels if any. We assume these are rare
3778  * enough that it's okay to generate retail TOC entries for them.
3779  */
3780  if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
3782  {
3783  for (int i = 0; i < loinfo->numlos; i++)
3784  {
3785  CatalogId catId;
3786  char namebuf[32];
3787 
3788  /* Build identifying info for this blob */
3789  catId.tableoid = loinfo->dobj.catId.tableoid;
3790  catId.oid = loinfo->looids[i];
3791  snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[i]);
3792 
3793  if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3794  dumpComment(fout, "LARGE OBJECT", namebuf,
3795  NULL, loinfo->rolname,
3796  catId, 0, loinfo->dobj.dumpId);
3797 
3798  if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3799  dumpSecLabel(fout, "LARGE OBJECT", namebuf,
3800  NULL, loinfo->rolname,
3801  catId, 0, loinfo->dobj.dumpId);
3802  }
3803  }
3804 
3805  /*
3806  * Dump the ACLs if any (remember that all blobs in the group will have
3807  * the same ACL). If there's just one blob, dump a simple ACL entry; if
3808  * there's more, make a "LARGE OBJECTS" entry that really contains only
3809  * the ACL for the first blob. _printTocEntry() will be cued by the tag
3810  * string to emit a mutated version for each blob.
3811  */
3812  if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3813  {
3814  char namebuf[32];
3815 
3816  /* Build identifying info for the first blob */
3817  snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[0]);
3818 
3819  if (loinfo->numlos > 1)
3820  {
3821  char tagbuf[64];
3822 
3823  snprintf(tagbuf, sizeof(tagbuf), "LARGE OBJECTS %u..%u",
3824  loinfo->looids[0], loinfo->looids[loinfo->numlos - 1]);
3825 
3826  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3827  "LARGE OBJECT", namebuf, NULL, NULL,
3828  tagbuf, loinfo->rolname, &loinfo->dacl);
3829  }
3830  else
3831  {
3832  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3833  "LARGE OBJECT", namebuf, NULL, NULL,
3834  NULL, loinfo->rolname, &loinfo->dacl);
3835  }
3836  }
3837 
3838  destroyPQExpBuffer(cquery);
3839 }
3840 
3841 /*
3842  * dumpLOs:
3843  * dump the data contents of the large objects in the given group
3844  */
3845 static int
3846 dumpLOs(Archive *fout, const void *arg)
3847 {
3848  const LoInfo *loinfo = (const LoInfo *) arg;
3849  PGconn *conn = GetConnection(fout);
3850  char buf[LOBBUFSIZE];
3851 
3852  pg_log_info("saving large objects \"%s\"", loinfo->dobj.name);
3853 
3854  for (int i = 0; i < loinfo->numlos; i++)
3855  {
3856  Oid loOid = loinfo->looids[i];
3857  int loFd;
3858  int cnt;
3859 
3860  /* Open the LO */
3861  loFd = lo_open(conn, loOid, INV_READ);
3862  if (loFd == -1)
3863  pg_fatal("could not open large object %u: %s",
3864  loOid, PQerrorMessage(conn));
3865 
3866  StartLO(fout, loOid);
3867 
3868  /* Now read it in chunks, sending data to archive */
3869  do
3870  {
3871  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3872  if (cnt < 0)
3873  pg_fatal("error reading large object %u: %s",
3874  loOid, PQerrorMessage(conn));
3875 
3876  WriteData(fout, buf, cnt);
3877  } while (cnt > 0);
3878 
3879  lo_close(conn, loFd);
3880 
3881  EndLO(fout, loOid);
3882  }
3883 
3884  return 1;
3885 }
3886 
3887 /*
3888  * getPolicies
3889  * get information about all RLS policies on dumpable tables.
3890  */
3891 void
3892 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3893 {
3894  PQExpBuffer query;
3895  PQExpBuffer tbloids;
3896  PGresult *res;
3897  PolicyInfo *polinfo;
3898  int i_oid;
3899  int i_tableoid;
3900  int i_polrelid;
3901  int i_polname;
3902  int i_polcmd;
3903  int i_polpermissive;
3904  int i_polroles;
3905  int i_polqual;
3906  int i_polwithcheck;
3907  int i,
3908  j,
3909  ntups;
3910 
3911  /* No policies before 9.5 */
3912  if (fout->remoteVersion < 90500)
3913  return;
3914 
3915  query = createPQExpBuffer();
3916  tbloids = createPQExpBuffer();
3917 
3918  /*
3919  * Identify tables of interest, and check which ones have RLS enabled.
3920  */
3921  appendPQExpBufferChar(tbloids, '{');
3922  for (i = 0; i < numTables; i++)
3923  {
3924  TableInfo *tbinfo = &tblinfo[i];
3925 
3926  /* Ignore row security on tables not to be dumped */
3927  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3928  continue;
3929 
3930  /* It can't have RLS or policies if it's not a table */
3931  if (tbinfo->relkind != RELKIND_RELATION &&
3932  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
3933  continue;
3934 
3935  /* Add it to the list of table OIDs to be probed below */
3936  if (tbloids->len > 1) /* do we have more than the '{'? */
3937  appendPQExpBufferChar(tbloids, ',');
3938  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
3939 
3940  /* Is RLS enabled? (That's separate from whether it has policies) */
3941  if (tbinfo->rowsec)
3942  {
3944 
3945  /*
3946  * We represent RLS being enabled on a table by creating a
3947  * PolicyInfo object with null polname.
3948  *
3949  * Note: use tableoid 0 so that this object won't be mistaken for
3950  * something that pg_depend entries apply to.
3951  */
3952  polinfo = pg_malloc(sizeof(PolicyInfo));
3953  polinfo->dobj.objType = DO_POLICY;
3954  polinfo->dobj.catId.tableoid = 0;
3955  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3956  AssignDumpId(&polinfo->dobj);
3957  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3958  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3959  polinfo->poltable = tbinfo;
3960  polinfo->polname = NULL;
3961  polinfo->polcmd = '\0';
3962  polinfo->polpermissive = 0;
3963  polinfo->polroles = NULL;
3964  polinfo->polqual = NULL;
3965  polinfo->polwithcheck = NULL;
3966  }
3967  }
3968  appendPQExpBufferChar(tbloids, '}');
3969 
3970  /*
3971  * Now, read all RLS policies belonging to the tables of interest, and
3972  * create PolicyInfo objects for them. (Note that we must filter the
3973  * results server-side not locally, because we dare not apply pg_get_expr
3974  * to tables we don't have lock on.)
3975  */
3976  pg_log_info("reading row-level security policies");
3977 
3978  printfPQExpBuffer(query,
3979  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3980  if (fout->remoteVersion >= 100000)
3981  appendPQExpBufferStr(query, "pol.polpermissive, ");
3982  else
3983  appendPQExpBufferStr(query, "'t' as polpermissive, ");
3984  appendPQExpBuffer(query,
3985  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3986  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3987  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3988  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3989  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
3990  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
3991  tbloids->data);
3992 
3993  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3994 
3995  ntups = PQntuples(res);
3996  if (ntups > 0)
3997  {
3998  i_oid = PQfnumber(res, "oid");
3999  i_tableoid = PQfnumber(res, "tableoid");
4000  i_polrelid = PQfnumber(res, "polrelid");
4001  i_polname = PQfnumber(res, "polname");
4002  i_polcmd = PQfnumber(res, "polcmd");
4003  i_polpermissive = PQfnumber(res, "polpermissive");
4004  i_polroles = PQfnumber(res, "polroles");
4005  i_polqual = PQfnumber(res, "polqual");
4006  i_polwithcheck = PQfnumber(res, "polwithcheck");
4007 
4008  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
4009 
4010  for (j = 0; j < ntups; j++)
4011  {
4012  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
4013  TableInfo *tbinfo = findTableByOid(polrelid);
4014 
4016 
4017  polinfo[j].dobj.objType = DO_POLICY;
4018  polinfo[j].dobj.catId.tableoid =
4019  atooid(PQgetvalue(res, j, i_tableoid));
4020  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
4021  AssignDumpId(&polinfo[j].dobj);
4022  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4023  polinfo[j].poltable = tbinfo;
4024  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
4025  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
4026 
4027  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
4028  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
4029 
4030  if (PQgetisnull(res, j, i_polroles))
4031  polinfo[j].polroles = NULL;
4032  else
4033  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
4034 
4035  if (PQgetisnull(res, j, i_polqual))
4036  polinfo[j].polqual = NULL;
4037  else
4038  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
4039 
4040  if (PQgetisnull(res, j, i_polwithcheck))
4041  polinfo[j].polwithcheck = NULL;
4042  else
4043  polinfo[j].polwithcheck
4044  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
4045  }
4046  }
4047 
4048  PQclear(res);
4049 
4050  destroyPQExpBuffer(query);
4051  destroyPQExpBuffer(tbloids);
4052 }
4053 
4054 /*
4055  * dumpPolicy
4056  * dump the definition of the given policy
4057  */
4058 static void
4059 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
4060 {
4061  DumpOptions *dopt = fout->dopt;
4062  TableInfo *tbinfo = polinfo->poltable;
4063  PQExpBuffer query;
4064  PQExpBuffer delqry;
4065  PQExpBuffer polprefix;
4066  char *qtabname;
4067  const char *cmd;
4068  char *tag;
4069 
4070  /* Do nothing in data-only dump */
4071  if (dopt->dataOnly)
4072  return;
4073 
4074  /*
4075  * If polname is NULL, then this record is just indicating that ROW LEVEL
4076  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
4077  * ROW LEVEL SECURITY.
4078  */
4079  if (polinfo->polname == NULL)
4080  {
4081  query = createPQExpBuffer();
4082 
4083  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
4084  fmtQualifiedDumpable(tbinfo));
4085 
4086  /*
4087  * We must emit the ROW SECURITY object's dependency on its table
4088  * explicitly, because it will not match anything in pg_depend (unlike
4089  * the case for other PolicyInfo objects).
4090  */
4091  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4092  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4093  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
4094  .namespace = polinfo->dobj.namespace->dobj.name,
4095  .owner = tbinfo->rolname,
4096  .description = "ROW SECURITY",
4097  .section = SECTION_POST_DATA,
4098  .createStmt = query->data,
4099  .deps = &(tbinfo->dobj.dumpId),
4100  .nDeps = 1));
4101 
4102  destroyPQExpBuffer(query);
4103  return;
4104  }
4105 
4106  if (polinfo->polcmd == '*')
4107  cmd = "";
4108  else if (polinfo->polcmd == 'r')
4109  cmd = " FOR SELECT";
4110  else if (polinfo->polcmd == 'a')
4111  cmd = " FOR INSERT";
4112  else if (polinfo->polcmd == 'w')
4113  cmd = " FOR UPDATE";
4114  else if (polinfo->polcmd == 'd')
4115  cmd = " FOR DELETE";
4116  else
4117  pg_fatal("unexpected policy command type: %c",
4118  polinfo->polcmd);
4119 
4120  query = createPQExpBuffer();
4121  delqry = createPQExpBuffer();
4122  polprefix = createPQExpBuffer();
4123 
4124  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
4125 
4126  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
4127 
4128  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
4129  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
4130 
4131  if (polinfo->polroles != NULL)
4132  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
4133 
4134  if (polinfo->polqual != NULL)
4135  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
4136 
4137  if (polinfo->polwithcheck != NULL)
4138  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
4139 
4140  appendPQExpBufferStr(query, ";\n");
4141 
4142  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
4143  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
4144 
4145  appendPQExpBuffer(polprefix, "POLICY %s ON",
4146  fmtId(polinfo->polname));
4147 
4148  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
4149 
4150  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4151  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4152  ARCHIVE_OPTS(.tag = tag,
4153  .namespace = polinfo->dobj.namespace->dobj.name,
4154  .owner = tbinfo->rolname,
4155  .description = "POLICY",
4156  .section = SECTION_POST_DATA,
4157  .createStmt = query->data,
4158  .dropStmt = delqry->data));
4159 
4160  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4161  dumpComment(fout, polprefix->data, qtabname,
4162  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
4163  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
4164 
4165  free(tag);
4166  destroyPQExpBuffer(query);
4167  destroyPQExpBuffer(delqry);
4168  destroyPQExpBuffer(polprefix);
4169  free(qtabname);
4170 }
4171 
4172 /*
4173  * getPublications
4174  * get information about publications
4175  */
4177 getPublications(Archive *fout, int *numPublications)
4178 {
4179  DumpOptions *dopt = fout->dopt;
4180  PQExpBuffer query;
4181  PGresult *res;
4182  PublicationInfo *pubinfo;
4183  int i_tableoid;
4184  int i_oid;
4185  int i_pubname;
4186  int i_pubowner;
4187  int i_puballtables;
4188  int i_pubinsert;
4189  int i_pubupdate;
4190  int i_pubdelete;
4191  int i_pubtruncate;
4192  int i_pubviaroot;
4193  int i,
4194  ntups;
4195 
4196  if (dopt->no_publications || fout->remoteVersion < 100000)
4197  {
4198  *numPublications = 0;
4199  return NULL;
4200  }
4201 
4202  query = createPQExpBuffer();
4203 
4204  resetPQExpBuffer(query);
4205 
4206  /* Get the publications. */
4207  if (fout->remoteVersion >= 130000)
4208  appendPQExpBufferStr(query,
4209  "SELECT p.tableoid, p.oid, p.pubname, "
4210  "p.pubowner, "
4211  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
4212  "FROM pg_publication p");
4213  else if (fout->remoteVersion >= 110000)
4214  appendPQExpBufferStr(query,
4215  "SELECT p.tableoid, p.oid, p.pubname, "
4216  "p.pubowner, "
4217  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
4218  "FROM pg_publication p");
4219  else
4220  appendPQExpBufferStr(query,
4221  "SELECT p.tableoid, p.oid, p.pubname, "
4222  "p.pubowner, "
4223  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
4224  "FROM pg_publication p");
4225 
4226  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4227 
4228  ntups = PQntuples(res);
4229 
4230  i_tableoid = PQfnumber(res, "tableoid");
4231  i_oid = PQfnumber(res, "oid");
4232  i_pubname = PQfnumber(res, "pubname");
4233  i_pubowner = PQfnumber(res, "pubowner");
4234  i_puballtables = PQfnumber(res, "puballtables");
4235  i_pubinsert = PQfnumber(res, "pubinsert");
4236  i_pubupdate = PQfnumber(res, "pubupdate");
4237  i_pubdelete = PQfnumber(res, "pubdelete");
4238  i_pubtruncate = PQfnumber(res, "pubtruncate");
4239  i_pubviaroot = PQfnumber(res, "pubviaroot");
4240 
4241  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4242 
4243  for (i = 0; i < ntups; i++)
4244  {
4245  pubinfo[i].dobj.objType = DO_PUBLICATION;
4246  pubinfo[i].dobj.catId.tableoid =
4247  atooid(PQgetvalue(res, i, i_tableoid));
4248  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4249  AssignDumpId(&pubinfo[i].dobj);
4250  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4251  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4252  pubinfo[i].puballtables =
4253  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4254  pubinfo[i].pubinsert =
4255  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4256  pubinfo[i].pubupdate =
4257  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4258  pubinfo[i].pubdelete =
4259  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4260  pubinfo[i].pubtruncate =
4261  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4262  pubinfo[i].pubviaroot =
4263  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4264 
4265  /* Decide whether we want to dump it */
4266  selectDumpableObject(&(pubinfo[i].dobj), fout);
4267  }
4268  PQclear(res);
4269 
4270  destroyPQExpBuffer(query);
4271 
4272  *numPublications = ntups;
4273  return pubinfo;
4274 }
4275 
4276 /*
4277  * dumpPublication
4278  * dump the definition of the given publication
4279  */
4280 static void
4282 {
4283  DumpOptions *dopt = fout->dopt;
4284  PQExpBuffer delq;
4285  PQExpBuffer query;
4286  char *qpubname;
4287  bool first = true;
4288 
4289  /* Do nothing in data-only dump */
4290  if (dopt->dataOnly)
4291  return;
4292 
4293  delq = createPQExpBuffer();
4294  query = createPQExpBuffer();
4295 
4296  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4297 
4298  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4299  qpubname);
4300 
4301  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4302  qpubname);
4303 
4304  if (pubinfo->puballtables)
4305  appendPQExpBufferStr(query, " FOR ALL TABLES");
4306 
4307  appendPQExpBufferStr(query, " WITH (publish = '");
4308  if (pubinfo->pubinsert)
4309  {
4310  appendPQExpBufferStr(query, "insert");
4311  first = false;
4312  }
4313 
4314  if (pubinfo->pubupdate)
4315  {
4316  if (!first)
4317  appendPQExpBufferStr(query, ", ");
4318 
4319  appendPQExpBufferStr(query, "update");
4320  first = false;
4321  }
4322 
4323  if (pubinfo->pubdelete)
4324  {
4325  if (!first)
4326  appendPQExpBufferStr(query, ", ");
4327 
4328  appendPQExpBufferStr(query, "delete");
4329  first = false;
4330  }
4331 
4332  if (pubinfo->pubtruncate)
4333  {
4334  if (!first)
4335  appendPQExpBufferStr(query, ", ");
4336 
4337  appendPQExpBufferStr(query, "truncate");
4338  first = false;
4339  }
4340 
4341  appendPQExpBufferChar(query, '\'');
4342 
4343  if (pubinfo->pubviaroot)
4344  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4345 
4346  appendPQExpBufferStr(query, ");\n");
4347 
4348  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4349  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4350  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4351  .owner = pubinfo->rolname,
4352  .description = "PUBLICATION",
4353  .section = SECTION_POST_DATA,
4354  .createStmt = query->data,
4355  .dropStmt = delq->data));
4356 
4357  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4358  dumpComment(fout, "PUBLICATION", qpubname,
4359  NULL, pubinfo->rolname,
4360  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4361 
4362  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4363  dumpSecLabel(fout, "PUBLICATION", qpubname,
4364  NULL, pubinfo->rolname,
4365  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4366 
4367  destroyPQExpBuffer(delq);
4368  destroyPQExpBuffer(query);
4369  free(qpubname);
4370 }
4371 
4372 /*
4373  * getPublicationNamespaces
4374  * get information about publication membership for dumpable schemas.
4375  */
4376 void
4378 {
4379  PQExpBuffer query;
4380  PGresult *res;
4381  PublicationSchemaInfo *pubsinfo;
4382  DumpOptions *dopt = fout->dopt;
4383  int i_tableoid;
4384  int i_oid;
4385  int i_pnpubid;
4386  int i_pnnspid;
4387  int i,
4388  j,
4389  ntups;
4390 
4391  if (dopt->no_publications || fout->remoteVersion < 150000)
4392  return;
4393 
4394  query = createPQExpBuffer();
4395 
4396  /* Collect all publication membership info. */
4397  appendPQExpBufferStr(query,
4398  "SELECT tableoid, oid, pnpubid, pnnspid "
4399  "FROM pg_catalog.pg_publication_namespace");
4400  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4401 
4402  ntups = PQntuples(res);
4403 
4404  i_tableoid = PQfnumber(res, "tableoid");
4405  i_oid = PQfnumber(res, "oid");
4406  i_pnpubid = PQfnumber(res, "pnpubid");
4407  i_pnnspid = PQfnumber(res, "pnnspid");
4408 
4409  /* this allocation may be more than we need */
4410  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4411  j = 0;
4412 
4413  for (i = 0; i < ntups; i++)
4414  {
4415  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4416  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4417  PublicationInfo *pubinfo;
4418  NamespaceInfo *nspinfo;
4419 
4420  /*
4421  * Ignore any entries for which we aren't interested in either the
4422  * publication or the rel.
4423  */
4424  pubinfo = findPublicationByOid(pnpubid);
4425  if (pubinfo == NULL)
4426  continue;
4427  nspinfo = findNamespaceByOid(pnnspid);
4428  if (nspinfo == NULL)
4429  continue;
4430 
4431  /*
4432  * We always dump publication namespaces unless the corresponding
4433  * namespace is excluded from the dump.
4434  */
4435  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4436  continue;
4437 
4438  /* OK, make a DumpableObject for this relationship */
4440  pubsinfo[j].dobj.catId.tableoid =
4441  atooid(PQgetvalue(res, i, i_tableoid));
4442  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4443  AssignDumpId(&pubsinfo[j].dobj);
4444  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4445  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4446  pubsinfo[j].publication = pubinfo;
4447  pubsinfo[j].pubschema = nspinfo;
4448 
4449  /* Decide whether we want to dump it */
4450  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4451 
4452  j++;
4453  }
4454 
4455  PQclear(res);
4456  destroyPQExpBuffer(query);
4457 }
4458 
4459 /*
4460  * getPublicationTables
4461  * get information about publication membership for dumpable tables.
4462  */
4463 void
4464 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4465 {
4466  PQExpBuffer query;
4467  PGresult *res;
4468  PublicationRelInfo *pubrinfo;
4469  DumpOptions *dopt = fout->dopt;
4470  int i_tableoid;
4471  int i_oid;
4472  int i_prpubid;
4473  int i_prrelid;
4474  int i_prrelqual;
4475  int i_prattrs;
4476  int i,
4477  j,
4478  ntups;
4479 
4480  if (dopt->no_publications || fout->remoteVersion < 100000)
4481  return;
4482 
4483  query = createPQExpBuffer();
4484 
4485  /* Collect all publication membership info. */
4486  if (fout->remoteVersion >= 150000)
4487  appendPQExpBufferStr(query,
4488  "SELECT tableoid, oid, prpubid, prrelid, "
4489  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4490  "(CASE\n"
4491  " WHEN pr.prattrs IS NOT NULL THEN\n"
4492  " (SELECT array_agg(attname)\n"
4493  " FROM\n"
4494  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4495  " pg_catalog.pg_attribute\n"
4496  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4497  " ELSE NULL END) prattrs "
4498  "FROM pg_catalog.pg_publication_rel pr");
4499  else
4500  appendPQExpBufferStr(query,
4501  "SELECT tableoid, oid, prpubid, prrelid, "
4502  "NULL AS prrelqual, NULL AS prattrs "
4503  "FROM pg_catalog.pg_publication_rel");
4504  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4505 
4506  ntups = PQntuples(res);
4507 
4508  i_tableoid = PQfnumber(res, "tableoid");
4509  i_oid = PQfnumber(res, "oid");
4510  i_prpubid = PQfnumber(res, "prpubid");
4511  i_prrelid = PQfnumber(res, "prrelid");
4512  i_prrelqual = PQfnumber(res, "prrelqual");
4513  i_prattrs = PQfnumber(res, "prattrs");
4514 
4515  /* this allocation may be more than we need */
4516  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4517  j = 0;
4518 
4519  for (i = 0; i < ntups; i++)
4520  {
4521  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4522  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4523  PublicationInfo *pubinfo;
4524  TableInfo *tbinfo;
4525 
4526  /*
4527  * Ignore any entries for which we aren't interested in either the
4528  * publication or the rel.
4529  */
4530  pubinfo = findPublicationByOid(prpubid);
4531  if (pubinfo == NULL)
4532  continue;
4533  tbinfo = findTableByOid(prrelid);
4534  if (tbinfo == NULL)
4535  continue;
4536 
4537  /*
4538  * Ignore publication membership of tables whose definitions are not
4539  * to be dumped.
4540  */
4541  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4542  continue;
4543 
4544  /* OK, make a DumpableObject for this relationship */
4545  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4546  pubrinfo[j].dobj.catId.tableoid =
4547  atooid(PQgetvalue(res, i, i_tableoid));
4548  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4549  AssignDumpId(&pubrinfo[j].dobj);
4550  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4551  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4552  pubrinfo[j].publication = pubinfo;
4553  pubrinfo[j].pubtable = tbinfo;
4554  if (PQgetisnull(res, i, i_prrelqual))
4555  pubrinfo[j].pubrelqual = NULL;
4556  else
4557  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4558 
4559  if (!PQgetisnull(res, i, i_prattrs))
4560  {
4561  char **attnames;
4562  int nattnames;
4563  PQExpBuffer attribs;
4564 
4565  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4566  &attnames, &nattnames))
4567  pg_fatal("could not parse %s array", "prattrs");
4568  attribs = createPQExpBuffer();
4569  for (int k = 0; k < nattnames; k++)
4570  {
4571  if (k > 0)
4572  appendPQExpBufferStr(attribs, ", ");
4573 
4574  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4575  }
4576  pubrinfo[j].pubrattrs = attribs->data;
4577  }
4578  else
4579  pubrinfo[j].pubrattrs = NULL;
4580 
4581  /* Decide whether we want to dump it */
4582  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4583 
4584  j++;
4585  }
4586 
4587  PQclear(res);
4588  destroyPQExpBuffer(query);
4589 }
4590 
4591 /*
4592  * dumpPublicationNamespace
4593  * dump the definition of the given publication schema mapping.
4594  */
4595 static void
4597 {
4598  DumpOptions *dopt = fout->dopt;
4599  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4600  PublicationInfo *pubinfo = pubsinfo->publication;
4601  PQExpBuffer query;
4602  char *tag;
4603 
4604  /* Do nothing in data-only dump */
4605  if (dopt->dataOnly)
4606  return;
4607 
4608  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4609 
4610  query = createPQExpBuffer();
4611 
4612  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4613  appendPQExpBuffer(query, "ADD TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4614 
4615  /*
4616  * There is no point in creating drop query as the drop is done by schema
4617  * drop.
4618  */
4619  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4620  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4621  ARCHIVE_OPTS(.tag = tag,
4622  .namespace = schemainfo->dobj.name,
4623  .owner = pubinfo->rolname,
4624  .description = "PUBLICATION TABLES IN SCHEMA",
4625  .section = SECTION_POST_DATA,
4626  .createStmt = query->data));
4627 
4628  /* These objects can't currently have comments or seclabels */
4629 
4630  free(tag);
4631  destroyPQExpBuffer(query);
4632 }
4633 
4634 /*
4635  * dumpPublicationTable
4636  * dump the definition of the given publication table mapping
4637  */
4638 static void
4640 {
4641  DumpOptions *dopt = fout->dopt;
4642  PublicationInfo *pubinfo = pubrinfo->publication;
4643  TableInfo *tbinfo = pubrinfo->pubtable;
4644  PQExpBuffer query;
4645  char *tag;
4646 
4647  /* Do nothing in data-only dump */
4648  if (dopt->dataOnly)
4649  return;
4650 
4651  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4652 
4653  query = createPQExpBuffer();
4654 
4655  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4656  fmtId(pubinfo->dobj.name));
4657  appendPQExpBuffer(query, " %s",
4658  fmtQualifiedDumpable(tbinfo));
4659 
4660  if (pubrinfo->pubrattrs)
4661  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4662 
4663  if (pubrinfo->pubrelqual)
4664  {
4665  /*
4666  * It's necessary to add parentheses around the expression because
4667  * pg_get_expr won't supply the parentheses for things like WHERE
4668  * TRUE.
4669  */
4670  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4671  }
4672  appendPQExpBufferStr(query, ";\n");
4673 
4674  /*
4675  * There is no point in creating a drop query as the drop is done by table
4676  * drop. (If you think to change this, see also _printTocEntry().)
4677  * Although this object doesn't really have ownership as such, set the
4678  * owner field anyway to ensure that the command is run by the correct
4679  * role at restore time.
4680  */
4681  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4682  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4683  ARCHIVE_OPTS(.tag = tag,
4684  .namespace = tbinfo->dobj.namespace->dobj.name,
4685  .owner = pubinfo->rolname,
4686  .description = "PUBLICATION TABLE",
4687  .section = SECTION_POST_DATA,
4688  .createStmt = query->data));
4689 
4690  /* These objects can't currently have comments or seclabels */
4691 
4692  free(tag);
4693  destroyPQExpBuffer(query);
4694 }
4695 
4696 /*
4697  * Is the currently connected user a superuser?
4698  */
4699 static bool
4701 {
4702  ArchiveHandle *AH = (ArchiveHandle *) fout;
4703  const char *val;
4704 
4705  val = PQparameterStatus(AH->connection, "is_superuser");
4706 
4707  if (val && strcmp(val, "on") == 0)
4708  return true;
4709 
4710  return false;
4711 }
4712 
4713 /*
4714  * getSubscriptions
4715  * get information about subscriptions
4716  */
4717 void
4719 {
4720  DumpOptions *dopt = fout->dopt;
4721  PQExpBuffer query;
4722  PGresult *res;
4723  SubscriptionInfo *subinfo;
4724  int i_tableoid;
4725  int i_oid;
4726  int i_subname;
4727  int i_subowner;
4728  int i_subbinary;
4729  int i_substream;
4730  int i_subtwophasestate;
4731  int i_subdisableonerr;
4732  int i_subpasswordrequired;
4733  int i_subrunasowner;
4734  int i_subconninfo;
4735  int i_subslotname;
4736  int i_subsynccommit;
4737  int i_subpublications;
4738  int i_suborigin;
4739  int i_suboriginremotelsn;
4740  int i_subenabled;
4741  int i_subfailover;
4742  int i,
4743  ntups;
4744 
4745  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4746  return;
4747 
4748  if (!is_superuser(fout))
4749  {
4750  int n;
4751 
4752  res = ExecuteSqlQuery(fout,
4753  "SELECT count(*) FROM pg_subscription "
4754  "WHERE subdbid = (SELECT oid FROM pg_database"
4755  " WHERE datname = current_database())",
4756  PGRES_TUPLES_OK);
4757  n = atoi(PQgetvalue(res, 0, 0));
4758  if (n > 0)
4759  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4760  PQclear(res);
4761  return;
4762  }
4763 
4764  query = createPQExpBuffer();
4765 
4766  /* Get the subscriptions in current database. */
4767  appendPQExpBufferStr(query,
4768  "SELECT s.tableoid, s.oid, s.subname,\n"
4769  " s.subowner,\n"
4770  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4771  " s.subpublications,\n");
4772 
4773  if (fout->remoteVersion >= 140000)
4774  appendPQExpBufferStr(query, " s.subbinary,\n");
4775  else
4776  appendPQExpBufferStr(query, " false AS subbinary,\n");
4777 
4778  if (fout->remoteVersion >= 140000)
4779  appendPQExpBufferStr(query, " s.substream,\n");
4780  else
4781  appendPQExpBufferStr(query, " 'f' AS substream,\n");
4782 
4783  if (fout->remoteVersion >= 150000)
4784  appendPQExpBufferStr(query,
4785  " s.subtwophasestate,\n"
4786  " s.subdisableonerr,\n");
4787  else
4788  appendPQExpBuffer(query,
4789  " '%c' AS subtwophasestate,\n"
4790  " false AS subdisableonerr,\n",
4792 
4793  if (fout->remoteVersion >= 160000)
4794  appendPQExpBufferStr(query,
4795  " s.subpasswordrequired,\n"
4796  " s.subrunasowner,\n"
4797  " s.suborigin,\n");
4798  else
4799  appendPQExpBuffer(query,
4800  " 't' AS subpasswordrequired,\n"
4801  " 't' AS subrunasowner,\n"
4802  " '%s' AS suborigin,\n",
4804 
4805  if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
4806  appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
4807  " s.subenabled,\n"
4808  " s.subfailover\n");
4809  else
4810  appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
4811  " false AS subenabled,\n"
4812  " false AS subfailover\n");
4813 
4814  appendPQExpBufferStr(query,
4815  "FROM pg_subscription s\n");
4816 
4817  if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
4818  appendPQExpBufferStr(query,
4819  "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
4820  " ON o.external_id = 'pg_' || s.oid::text \n");
4821 
4822  appendPQExpBufferStr(query,
4823  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4824  " WHERE datname = current_database())");
4825 
4826  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4827 
4828  ntups = PQntuples(res);
4829 
4830  /*
4831  * Get subscription fields. We don't include subskiplsn in the dump as
4832  * after restoring the dump this value may no longer be relevant.
4833  */
4834  i_tableoid = PQfnumber(res, "tableoid");
4835  i_oid = PQfnumber(res, "oid");
4836  i_subname = PQfnumber(res, "subname");
4837  i_subowner = PQfnumber(res, "subowner");
4838  i_subbinary = PQfnumber(res, "subbinary");
4839  i_substream = PQfnumber(res, "substream");
4840  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4841  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4842  i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
4843  i_subrunasowner = PQfnumber(res, "subrunasowner");
4844  i_subconninfo = PQfnumber(res, "subconninfo");
4845  i_subslotname = PQfnumber(res, "subslotname");
4846  i_subsynccommit = PQfnumber(res, "subsynccommit");
4847  i_subpublications = PQfnumber(res, "subpublications");
4848  i_suborigin = PQfnumber(res, "suborigin");
4849  i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
4850  i_subenabled = PQfnumber(res, "subenabled");
4851  i_subfailover = PQfnumber(res, "subfailover");
4852 
4853  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4854 
4855  for (i = 0; i < ntups; i++)
4856  {
4857  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4858  subinfo[i].dobj.catId.tableoid =
4859  atooid(PQgetvalue(res, i, i_tableoid));
4860  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4861  AssignDumpId(&subinfo[i].dobj);
4862  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4863  subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
4864 
4865  subinfo[i].subbinary =
4866  pg_strdup(PQgetvalue(res, i, i_subbinary));
4867  subinfo[i].substream =
4868  pg_strdup(PQgetvalue(res, i, i_substream));
4869  subinfo[i].subtwophasestate =
4870  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4871  subinfo[i].subdisableonerr =
4872  pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
4873  subinfo[i].subpasswordrequired =
4874  pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
4875  subinfo[i].subrunasowner =
4876  pg_strdup(PQgetvalue(res, i, i_subrunasowner));
4877  subinfo[i].subconninfo =
4878  pg_strdup(PQgetvalue(res, i, i_subconninfo));
4879  if (PQgetisnull(res, i, i_subslotname))
4880  subinfo[i].subslotname = NULL;
4881  else
4882  subinfo[i].subslotname =
4883  pg_strdup(PQgetvalue(res, i, i_subslotname));
4884  subinfo[i].subsynccommit =
4885  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4886  subinfo[i].subpublications =
4887  pg_strdup(PQgetvalue(res, i, i_subpublications));
4888  subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
4889  if (PQgetisnull(res, i, i_suboriginremotelsn))
4890  subinfo[i].suboriginremotelsn = NULL;
4891  else
4892  subinfo[i].suboriginremotelsn =
4893  pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
4894  subinfo[i].subenabled =
4895  pg_strdup(PQgetvalue(res, i, i_subenabled));
4896  subinfo[i].subfailover =
4897  pg_strdup(PQgetvalue(res, i, i_subfailover));
4898 
4899  /* Decide whether we want to dump it */
4900  selectDumpableObject(&(subinfo[i].dobj), fout);
4901  }
4902  PQclear(res);
4903 
4904  destroyPQExpBuffer(query);
4905 }
4906 
4907 /*
4908  * getSubscriptionTables
4909  * Get information about subscription membership for dumpable tables. This
4910  * will be used only in binary-upgrade mode for PG17 or later versions.
4911  */
4912 void
4914 {
4915  DumpOptions *dopt = fout->dopt;
4916  SubscriptionInfo *subinfo = NULL;
4917  SubRelInfo *subrinfo;
4918  PGresult *res;
4919  int i_srsubid;
4920  int i_srrelid;
4921  int i_srsubstate;
4922  int i_srsublsn;
4923  int ntups;
4924  Oid last_srsubid = InvalidOid;
4925 
4926  if (dopt->no_subscriptions || !dopt->binary_upgrade ||
4927  fout->remoteVersion < 170000)
4928  return;
4929 
4930  res = ExecuteSqlQuery(fout,
4931  "SELECT srsubid, srrelid, srsubstate, srsublsn "
4932  "FROM pg_catalog.pg_subscription_rel "
4933  "ORDER BY srsubid",
4934  PGRES_TUPLES_OK);
4935  ntups = PQntuples(res);
4936  if (ntups == 0)
4937  goto cleanup;
4938 
4939  /* Get pg_subscription_rel attributes */
4940  i_srsubid = PQfnumber(res, "srsubid");
4941  i_srrelid = PQfnumber(res, "srrelid");
4942  i_srsubstate = PQfnumber(res, "srsubstate");
4943  i_srsublsn = PQfnumber(res, "srsublsn");
4944 
4945  subrinfo = pg_malloc(ntups * sizeof(SubRelInfo));
4946  for (int i = 0; i < ntups; i++)
4947  {
4948  Oid cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
4949  Oid relid = atooid(PQgetvalue(res, i, i_srrelid));
4950  TableInfo *tblinfo;
4951 
4952  /*
4953  * If we switched to a new subscription, check if the subscription
4954  * exists.
4955  */
4956  if (cur_srsubid != last_srsubid)
4957  {
4958  subinfo = findSubscriptionByOid(cur_srsubid);
4959  if (subinfo == NULL)
4960  pg_fatal("subscription with OID %u does not exist", cur_srsubid);
4961 
4962  last_srsubid = cur_srsubid;
<