PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "common/relpath.h"
59 #include "dumputils.h"
60 #include "fe_utils/option_utils.h"
61 #include "fe_utils/string_utils.h"
62 #include "getopt_long.h"
63 #include "libpq/libpq-fs.h"
64 #include "parallel.h"
65 #include "pg_backup_db.h"
66 #include "pg_backup_utils.h"
67 #include "pg_dump.h"
68 #include "storage/block.h"
69 
70 typedef struct
71 {
72  Oid roleoid; /* role's OID */
73  const char *rolename; /* role's name */
74 } RoleNameItem;
75 
76 typedef struct
77 {
78  const char *descr; /* comment for an object */
79  Oid classoid; /* object class (catalog OID) */
80  Oid objoid; /* object OID */
81  int objsubid; /* subobject (table column #) */
82 } CommentItem;
83 
84 typedef struct
85 {
86  const char *provider; /* label provider of this security label */
87  const char *label; /* security label for an object */
88  Oid classoid; /* object class (catalog OID) */
89  Oid objoid; /* object OID */
90  int objsubid; /* subobject (table column #) */
91 } SecLabelItem;
92 
93 typedef enum OidOptions
94 {
97  zeroAsNone = 4
99 
100 /* global decls */
101 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
102 
103 static Oid g_last_builtin_oid; /* value of the last builtin oid */
104 
105 /* The specified names/patterns should to match at least one entity */
106 static int strict_names = 0;
107 
109 
110 /*
111  * Object inclusion/exclusion lists
112  *
113  * The string lists record the patterns given by command-line switches,
114  * which we then convert to lists of OIDs of matching objects.
115  */
117 static SimpleOidList schema_include_oids = {NULL, NULL};
119 static SimpleOidList schema_exclude_oids = {NULL, NULL};
120 
122 static SimpleOidList table_include_oids = {NULL, NULL};
124 static SimpleOidList table_exclude_oids = {NULL, NULL};
126 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
129 
131 static SimpleOidList extension_include_oids = {NULL, NULL};
132 
133 static const CatalogId nilCatalogId = {0, 0};
134 
135 /* override for standard extra_float_digits setting */
136 static bool have_extra_float_digits = false;
138 
139 /* sorted table of role names */
140 static RoleNameItem *rolenames = NULL;
141 static int nrolenames = 0;
142 
143 /* sorted table of comments */
144 static CommentItem *comments = NULL;
145 static int ncomments = 0;
146 
147 /* sorted table of security labels */
148 static SecLabelItem *seclabels = NULL;
149 static int nseclabels = 0;
150 
151 /*
152  * The default number of rows per INSERT when
153  * --inserts is specified without --rows-per-insert
154  */
155 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
156 
157 /*
158  * Macro for producing quoted, schema-qualified name of a dumpable object.
159  */
160 #define fmtQualifiedDumpable(obj) \
161  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
162  (obj)->dobj.name)
163 
164 static void help(const char *progname);
165 static void setup_connection(Archive *AH,
166  const char *dumpencoding, const char *dumpsnapshot,
167  char *use_role);
169 static void expand_schema_name_patterns(Archive *fout,
170  SimpleStringList *patterns,
171  SimpleOidList *oids,
172  bool strict_names);
173 static void expand_extension_name_patterns(Archive *fout,
174  SimpleStringList *patterns,
175  SimpleOidList *oids,
176  bool strict_names);
178  SimpleStringList *patterns,
179  SimpleOidList *oids);
180 static void expand_table_name_patterns(Archive *fout,
181  SimpleStringList *patterns,
182  SimpleOidList *oids,
183  bool strict_names);
184 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
185  const char *pattern);
186 
187 static NamespaceInfo *findNamespace(Oid nsoid);
188 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
189 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
190 static const char *getRoleName(const char *roleoid_str);
191 static void collectRoleNames(Archive *fout);
192 static void getAdditionalACLs(Archive *fout);
193 static void dumpCommentExtended(Archive *fout, const char *type,
194  const char *name, const char *namespace,
195  const char *owner, CatalogId catalogId,
196  int subid, DumpId dumpId,
197  const char *initdb_comment);
198 static inline void dumpComment(Archive *fout, const char *type,
199  const char *name, const char *namespace,
200  const char *owner, CatalogId catalogId,
201  int subid, DumpId dumpId);
202 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
203 static void collectComments(Archive *fout);
204 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
205  const char *namespace, const char *owner,
206  CatalogId catalogId, int subid, DumpId dumpId);
207 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
208 static void collectSecLabels(Archive *fout);
209 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
210 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
211 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
212 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
213 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
214 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
215 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
216 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
217 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
218 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
219 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
220  PGresult *res);
221 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
222 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
223 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
224 static void dumpCast(Archive *fout, const CastInfo *cast);
225 static void dumpTransform(Archive *fout, const TransformInfo *transform);
226 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
227 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
228 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
229 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
230 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
231 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
232 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
233 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
234 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
235 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
236 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
237 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
238 static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
239 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
240 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
241 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
242 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
243 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
244 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
245 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
246 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
247 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
248 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
249 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
250 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
251 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
252 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
253 static void dumpUserMappings(Archive *fout,
254  const char *servername, const char *namespace,
255  const char *owner, CatalogId catalogId, DumpId dumpId);
256 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
257 
258 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
259  const char *type, const char *name, const char *subname,
260  const char *nspname, const char *owner,
261  const DumpableAcl *dacl);
262 
263 static void getDependencies(Archive *fout);
264 static void BuildArchiveDependencies(Archive *fout);
265 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
266  DumpId **dependencies, int *nDeps, int *allocDeps);
267 
269 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
270  DumpableObject *boundaryObjs);
271 
272 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
273 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
274 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
275 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
276 static void buildMatViewRefreshDependencies(Archive *fout);
277 static void getTableDataFKConstraints(void);
278 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
279  bool is_agg);
280 static char *format_function_signature(Archive *fout,
281  const FuncInfo *finfo, bool honor_quotes);
282 static char *convertRegProcReference(const char *proc);
283 static char *getFormattedOperatorName(const char *oproid);
284 static char *convertTSFunction(Archive *fout, Oid funcOid);
285 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
286 static void getLOs(Archive *fout);
287 static void dumpLO(Archive *fout, const LoInfo *binfo);
288 static int dumpLOs(Archive *fout, const void *arg);
289 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
290 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
291 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
292 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
293 static void dumpDatabase(Archive *fout);
294 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
295  const char *dbname, Oid dboid);
296 static void dumpEncoding(Archive *AH);
297 static void dumpStdStrings(Archive *AH);
298 static void dumpSearchPath(Archive *AH);
300  PQExpBuffer upgrade_buffer,
301  Oid pg_type_oid,
302  bool force_array_type,
303  bool include_multirange_type);
305  PQExpBuffer upgrade_buffer,
306  const TableInfo *tbinfo);
307 static void binary_upgrade_set_pg_class_oids(Archive *fout,
308  PQExpBuffer upgrade_buffer,
309  Oid pg_class_oid, bool is_index);
310 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
311  const DumpableObject *dobj,
312  const char *objtype,
313  const char *objname,
314  const char *objnamespace);
315 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
316 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
317 static bool nonemptyReloptions(const char *reloptions);
318 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
319  const char *prefix, Archive *fout);
320 static char *get_synchronized_snapshot(Archive *fout);
321 static void setupDumpWorker(Archive *AH);
322 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
323 
324 
325 int
326 main(int argc, char **argv)
327 {
328  int c;
329  const char *filename = NULL;
330  const char *format = "p";
331  TableInfo *tblinfo;
332  int numTables;
333  DumpableObject **dobjs;
334  int numObjs;
335  DumpableObject *boundaryObjs;
336  int i;
337  int optindex;
338  RestoreOptions *ropt;
339  Archive *fout; /* the script file */
340  bool g_verbose = false;
341  const char *dumpencoding = NULL;
342  const char *dumpsnapshot = NULL;
343  char *use_role = NULL;
344  int numWorkers = 1;
345  int plainText = 0;
346  ArchiveFormat archiveFormat = archUnknown;
347  ArchiveMode archiveMode;
348  pg_compress_specification compression_spec = {0};
349  char *compression_detail = NULL;
350  char *compression_algorithm_str = "none";
351  char *error_detail = NULL;
352  bool user_compression_defined = false;
353 
354  static DumpOptions dopt;
355 
356  static struct option long_options[] = {
357  {"data-only", no_argument, NULL, 'a'},
358  {"blobs", no_argument, NULL, 'b'},
359  {"large-objects", no_argument, NULL, 'b'},
360  {"no-blobs", no_argument, NULL, 'B'},
361  {"no-large-objects", no_argument, NULL, 'B'},
362  {"clean", no_argument, NULL, 'c'},
363  {"create", no_argument, NULL, 'C'},
364  {"dbname", required_argument, NULL, 'd'},
365  {"extension", required_argument, NULL, 'e'},
366  {"file", required_argument, NULL, 'f'},
367  {"format", required_argument, NULL, 'F'},
368  {"host", required_argument, NULL, 'h'},
369  {"jobs", 1, NULL, 'j'},
370  {"no-reconnect", no_argument, NULL, 'R'},
371  {"no-owner", no_argument, NULL, 'O'},
372  {"port", required_argument, NULL, 'p'},
373  {"schema", required_argument, NULL, 'n'},
374  {"exclude-schema", required_argument, NULL, 'N'},
375  {"schema-only", no_argument, NULL, 's'},
376  {"superuser", required_argument, NULL, 'S'},
377  {"table", required_argument, NULL, 't'},
378  {"exclude-table", required_argument, NULL, 'T'},
379  {"no-password", no_argument, NULL, 'w'},
380  {"password", no_argument, NULL, 'W'},
381  {"username", required_argument, NULL, 'U'},
382  {"verbose", no_argument, NULL, 'v'},
383  {"no-privileges", no_argument, NULL, 'x'},
384  {"no-acl", no_argument, NULL, 'x'},
385  {"compress", required_argument, NULL, 'Z'},
386  {"encoding", required_argument, NULL, 'E'},
387  {"help", no_argument, NULL, '?'},
388  {"version", no_argument, NULL, 'V'},
389 
390  /*
391  * the following options don't have an equivalent short option letter
392  */
393  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
394  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
395  {"column-inserts", no_argument, &dopt.column_inserts, 1},
396  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
397  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
398  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
399  {"exclude-table-data", required_argument, NULL, 4},
400  {"extra-float-digits", required_argument, NULL, 8},
401  {"if-exists", no_argument, &dopt.if_exists, 1},
402  {"inserts", no_argument, NULL, 9},
403  {"lock-wait-timeout", required_argument, NULL, 2},
404  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
405  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
406  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
407  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
408  {"role", required_argument, NULL, 3},
409  {"section", required_argument, NULL, 5},
410  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
411  {"snapshot", required_argument, NULL, 6},
412  {"strict-names", no_argument, &strict_names, 1},
413  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
414  {"no-comments", no_argument, &dopt.no_comments, 1},
415  {"no-publications", no_argument, &dopt.no_publications, 1},
416  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
417  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
418  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
419  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
420  {"no-sync", no_argument, NULL, 7},
421  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
422  {"rows-per-insert", required_argument, NULL, 10},
423  {"include-foreign-data", required_argument, NULL, 11},
424 
425  {NULL, 0, NULL, 0}
426  };
427 
428  pg_logging_init(argv[0]);
430  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
431 
432  /*
433  * Initialize what we need for parallel execution, especially for thread
434  * support on Windows.
435  */
437 
438  progname = get_progname(argv[0]);
439 
440  if (argc > 1)
441  {
442  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
443  {
444  help(progname);
445  exit_nicely(0);
446  }
447  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
448  {
449  puts("pg_dump (PostgreSQL) " PG_VERSION);
450  exit_nicely(0);
451  }
452  }
453 
454  InitDumpOptions(&dopt);
455 
456  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
457  long_options, &optindex)) != -1)
458  {
459  switch (c)
460  {
461  case 'a': /* Dump data only */
462  dopt.dataOnly = true;
463  break;
464 
465  case 'b': /* Dump LOs */
466  dopt.outputLOs = true;
467  break;
468 
469  case 'B': /* Don't dump LOs */
470  dopt.dontOutputLOs = true;
471  break;
472 
473  case 'c': /* clean (i.e., drop) schema prior to create */
474  dopt.outputClean = 1;
475  break;
476 
477  case 'C': /* Create DB */
478  dopt.outputCreateDB = 1;
479  break;
480 
481  case 'd': /* database name */
482  dopt.cparams.dbname = pg_strdup(optarg);
483  break;
484 
485  case 'e': /* include extension(s) */
487  dopt.include_everything = false;
488  break;
489 
490  case 'E': /* Dump encoding */
491  dumpencoding = pg_strdup(optarg);
492  break;
493 
494  case 'f':
496  break;
497 
498  case 'F':
500  break;
501 
502  case 'h': /* server host */
503  dopt.cparams.pghost = pg_strdup(optarg);
504  break;
505 
506  case 'j': /* number of dump jobs */
507  if (!option_parse_int(optarg, "-j/--jobs", 1,
508  PG_MAX_JOBS,
509  &numWorkers))
510  exit_nicely(1);
511  break;
512 
513  case 'n': /* include schema(s) */
515  dopt.include_everything = false;
516  break;
517 
518  case 'N': /* exclude schema(s) */
520  break;
521 
522  case 'O': /* Don't reconnect to match owner */
523  dopt.outputNoOwner = 1;
524  break;
525 
526  case 'p': /* server port */
527  dopt.cparams.pgport = pg_strdup(optarg);
528  break;
529 
530  case 'R':
531  /* no-op, still accepted for backwards compatibility */
532  break;
533 
534  case 's': /* dump schema only */
535  dopt.schemaOnly = true;
536  break;
537 
538  case 'S': /* Username for superuser in plain text output */
540  break;
541 
542  case 't': /* include table(s) */
544  dopt.include_everything = false;
545  break;
546 
547  case 'T': /* exclude table(s) */
549  break;
550 
551  case 'U':
553  break;
554 
555  case 'v': /* verbose */
556  g_verbose = true;
558  break;
559 
560  case 'w':
562  break;
563 
564  case 'W':
566  break;
567 
568  case 'x': /* skip ACL dump */
569  dopt.aclsSkip = true;
570  break;
571 
572  case 'Z': /* Compression */
573  parse_compress_options(optarg, &compression_algorithm_str,
574  &compression_detail);
575  user_compression_defined = true;
576  break;
577 
578  case 0:
579  /* This covers the long options. */
580  break;
581 
582  case 2: /* lock-wait-timeout */
584  break;
585 
586  case 3: /* SET ROLE */
587  use_role = pg_strdup(optarg);
588  break;
589 
590  case 4: /* exclude table(s) data */
592  break;
593 
594  case 5: /* section */
596  break;
597 
598  case 6: /* snapshot */
599  dumpsnapshot = pg_strdup(optarg);
600  break;
601 
602  case 7: /* no-sync */
603  dosync = false;
604  break;
605 
606  case 8:
608  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
610  exit_nicely(1);
611  break;
612 
613  case 9: /* inserts */
614 
615  /*
616  * dump_inserts also stores --rows-per-insert, careful not to
617  * overwrite that.
618  */
619  if (dopt.dump_inserts == 0)
621  break;
622 
623  case 10: /* rows per insert */
624  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
625  &dopt.dump_inserts))
626  exit_nicely(1);
627  break;
628 
629  case 11: /* include foreign data */
631  optarg);
632  break;
633 
634  default:
635  /* getopt_long already emitted a complaint */
636  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
637  exit_nicely(1);
638  }
639  }
640 
641  /*
642  * Non-option argument specifies database name as long as it wasn't
643  * already specified with -d / --dbname
644  */
645  if (optind < argc && dopt.cparams.dbname == NULL)
646  dopt.cparams.dbname = argv[optind++];
647 
648  /* Complain if any arguments remain */
649  if (optind < argc)
650  {
651  pg_log_error("too many command-line arguments (first is \"%s\")",
652  argv[optind]);
653  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
654  exit_nicely(1);
655  }
656 
657  /* --column-inserts implies --inserts */
658  if (dopt.column_inserts && dopt.dump_inserts == 0)
660 
661  /*
662  * Binary upgrade mode implies dumping sequence data even in schema-only
663  * mode. This is not exposed as a separate option, but kept separate
664  * internally for clarity.
665  */
666  if (dopt.binary_upgrade)
667  dopt.sequence_data = 1;
668 
669  if (dopt.dataOnly && dopt.schemaOnly)
670  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
671 
673  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
674 
675  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
676  pg_fatal("option --include-foreign-data is not supported with parallel backup");
677 
678  if (dopt.dataOnly && dopt.outputClean)
679  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
680 
681  if (dopt.if_exists && !dopt.outputClean)
682  pg_fatal("option --if-exists requires option -c/--clean");
683 
684  /*
685  * --inserts are already implied above if --column-inserts or
686  * --rows-per-insert were specified.
687  */
688  if (dopt.do_nothing && dopt.dump_inserts == 0)
689  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
690 
691  /* Identify archive format to emit */
692  archiveFormat = parseArchiveFormat(format, &archiveMode);
693 
694  /* archiveFormat specific setup */
695  if (archiveFormat == archNull)
696  plainText = 1;
697 
698  /*
699  * Compression options
700  */
701  if (!parse_compress_algorithm(compression_algorithm_str,
703  pg_fatal("unrecognized compression algorithm: \"%s\"",
704  compression_algorithm_str);
705 
707  &compression_spec);
708  error_detail = validate_compress_specification(&compression_spec);
709  if (error_detail != NULL)
710  pg_fatal("invalid compression specification: %s",
711  error_detail);
712 
713  switch (compression_algorithm)
714  {
715  case PG_COMPRESSION_NONE:
716  /* fallthrough */
717  case PG_COMPRESSION_GZIP:
718  break;
719  case PG_COMPRESSION_ZSTD:
720  pg_fatal("compression with %s is not yet supported", "ZSTD");
721  break;
722  case PG_COMPRESSION_LZ4:
723  pg_fatal("compression with %s is not yet supported", "LZ4");
724  break;
725  }
726 
727  /*
728  * Custom and directory formats are compressed by default with gzip when
729  * available, not the others.
730  */
731  if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
732  !user_compression_defined)
733  {
734 #ifdef HAVE_LIBZ
736  &compression_spec);
737 #else
738  /* Nothing to do in the default case */
739 #endif
740  }
741 
742  /*
743  * If emitting an archive format, we always want to emit a DATABASE item,
744  * in case --create is specified at pg_restore time.
745  */
746  if (!plainText)
747  dopt.outputCreateDB = 1;
748 
749  /* Parallel backup only in the directory archive format so far */
750  if (archiveFormat != archDirectory && numWorkers > 1)
751  pg_fatal("parallel backup only supported by the directory format");
752 
753  /* Open the output file */
754  fout = CreateArchive(filename, archiveFormat, compression_spec,
755  dosync, archiveMode, setupDumpWorker);
756 
757  /* Make dump options accessible right away */
758  SetArchiveOptions(fout, &dopt, NULL);
759 
760  /* Register the cleanup hook */
761  on_exit_close_archive(fout);
762 
763  /* Let the archiver know how noisy to be */
764  fout->verbose = g_verbose;
765 
766 
767  /*
768  * We allow the server to be back to 9.2, and up to any minor release of
769  * our own major version. (See also version check in pg_dumpall.c.)
770  */
771  fout->minRemoteVersion = 90200;
772  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
773 
774  fout->numWorkers = numWorkers;
775 
776  /*
777  * Open the database using the Archiver, so it knows about it. Errors mean
778  * death.
779  */
780  ConnectDatabase(fout, &dopt.cparams, false);
781  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
782 
783  /*
784  * On hot standbys, never try to dump unlogged table data, since it will
785  * just throw an error.
786  */
787  if (fout->isStandby)
788  dopt.no_unlogged_table_data = true;
789 
790  /*
791  * Find the last built-in OID, if needed (prior to 8.1)
792  *
793  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
794  */
796 
797  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
798 
799  /* Expand schema selection patterns into OID lists */
800  if (schema_include_patterns.head != NULL)
801  {
804  strict_names);
805  if (schema_include_oids.head == NULL)
806  pg_fatal("no matching schemas were found");
807  }
810  false);
811  /* non-matching exclusion patterns aren't an error */
812 
813  /* Expand table selection patterns into OID lists */
814  if (table_include_patterns.head != NULL)
815  {
818  strict_names);
819  if (table_include_oids.head == NULL)
820  pg_fatal("no matching tables were found");
821  }
824  false);
825 
828  false);
829 
832 
833  /* non-matching exclusion patterns aren't an error */
834 
835  /* Expand extension selection patterns into OID lists */
836  if (extension_include_patterns.head != NULL)
837  {
840  strict_names);
841  if (extension_include_oids.head == NULL)
842  pg_fatal("no matching extensions were found");
843  }
844 
845  /*
846  * Dumping LOs is the default for dumps where an inclusion switch is not
847  * used (an "include everything" dump). -B can be used to exclude LOs
848  * from those dumps. -b can be used to include LOs even when an
849  * inclusion switch is used.
850  *
851  * -s means "schema only" and LOs are data, not schema, so we never
852  * include LOs when -s is used.
853  */
854  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputLOs)
855  dopt.outputLOs = true;
856 
857  /*
858  * Collect role names so we can map object owner OIDs to names.
859  */
860  collectRoleNames(fout);
861 
862  /*
863  * Now scan the database and create DumpableObject structs for all the
864  * objects we intend to dump.
865  */
866  tblinfo = getSchemaData(fout, &numTables);
867 
868  if (!dopt.schemaOnly)
869  {
870  getTableData(&dopt, tblinfo, numTables, 0);
872  if (dopt.dataOnly)
874  }
875 
876  if (dopt.schemaOnly && dopt.sequence_data)
877  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
878 
879  /*
880  * In binary-upgrade mode, we do not have to worry about the actual LO
881  * data or the associated metadata that resides in the pg_largeobject and
882  * pg_largeobject_metadata tables, respectively.
883  *
884  * However, we do need to collect LO information as there may be
885  * comments or other information on LOs that we do need to dump out.
886  */
887  if (dopt.outputLOs || dopt.binary_upgrade)
888  getLOs(fout);
889 
890  /*
891  * Collect dependency data to assist in ordering the objects.
892  */
893  getDependencies(fout);
894 
895  /*
896  * Collect ACLs, comments, and security labels, if wanted.
897  */
898  if (!dopt.aclsSkip)
899  getAdditionalACLs(fout);
900  if (!dopt.no_comments)
901  collectComments(fout);
902  if (!dopt.no_security_labels)
903  collectSecLabels(fout);
904 
905  /* Lastly, create dummy objects to represent the section boundaries */
906  boundaryObjs = createBoundaryObjects();
907 
908  /* Get pointers to all the known DumpableObjects */
909  getDumpableObjects(&dobjs, &numObjs);
910 
911  /*
912  * Add dummy dependencies to enforce the dump section ordering.
913  */
914  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
915 
916  /*
917  * Sort the objects into a safe dump order (no forward references).
918  *
919  * We rely on dependency information to help us determine a safe order, so
920  * the initial sort is mostly for cosmetic purposes: we sort by name to
921  * ensure that logically identical schemas will dump identically.
922  */
923  sortDumpableObjectsByTypeName(dobjs, numObjs);
924 
925  sortDumpableObjects(dobjs, numObjs,
926  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
927 
928  /*
929  * Create archive TOC entries for all the objects to be dumped, in a safe
930  * order.
931  */
932 
933  /*
934  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
935  */
936  dumpEncoding(fout);
937  dumpStdStrings(fout);
938  dumpSearchPath(fout);
939 
940  /* The database items are always next, unless we don't want them at all */
941  if (dopt.outputCreateDB)
942  dumpDatabase(fout);
943 
944  /* Now the rearrangeable objects. */
945  for (i = 0; i < numObjs; i++)
946  dumpDumpableObject(fout, dobjs[i]);
947 
948  /*
949  * Set up options info to ensure we dump what we want.
950  */
951  ropt = NewRestoreOptions();
952  ropt->filename = filename;
953 
954  /* if you change this list, see dumpOptionsFromRestoreOptions */
955  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
956  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
957  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
958  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
960  ropt->dropSchema = dopt.outputClean;
961  ropt->dataOnly = dopt.dataOnly;
962  ropt->schemaOnly = dopt.schemaOnly;
963  ropt->if_exists = dopt.if_exists;
964  ropt->column_inserts = dopt.column_inserts;
965  ropt->dumpSections = dopt.dumpSections;
966  ropt->aclsSkip = dopt.aclsSkip;
967  ropt->superuser = dopt.outputSuperuser;
968  ropt->createDB = dopt.outputCreateDB;
969  ropt->noOwner = dopt.outputNoOwner;
970  ropt->noTableAm = dopt.outputNoTableAm;
971  ropt->noTablespace = dopt.outputNoTablespaces;
972  ropt->disable_triggers = dopt.disable_triggers;
973  ropt->use_setsessauth = dopt.use_setsessauth;
975  ropt->dump_inserts = dopt.dump_inserts;
976  ropt->no_comments = dopt.no_comments;
977  ropt->no_publications = dopt.no_publications;
979  ropt->no_subscriptions = dopt.no_subscriptions;
980  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
983  ropt->sequence_data = dopt.sequence_data;
984  ropt->binary_upgrade = dopt.binary_upgrade;
985 
986  ropt->compression_spec = compression_spec;
987 
988  ropt->suppressDumpWarnings = true; /* We've already shown them */
989 
990  SetArchiveOptions(fout, &dopt, ropt);
991 
992  /* Mark which entries should be output */
994 
995  /*
996  * The archive's TOC entries are now marked as to which ones will actually
997  * be output, so we can set up their dependency lists properly. This isn't
998  * necessary for plain-text output, though.
999  */
1000  if (!plainText)
1002 
1003  /*
1004  * And finally we can do the actual output.
1005  *
1006  * Note: for non-plain-text output formats, the output file is written
1007  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1008  * right now.
1009  */
1010  if (plainText)
1011  RestoreArchive(fout);
1012 
1013  CloseArchive(fout);
1014 
1015  exit_nicely(0);
1016 }
1017 
1018 
1019 static void
1020 help(const char *progname)
1021 {
1022  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1023  printf(_("Usage:\n"));
1024  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1025 
1026  printf(_("\nGeneral options:\n"));
1027  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1028  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1029  " plain text (default))\n"));
1030  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1031  printf(_(" -v, --verbose verbose mode\n"));
1032  printf(_(" -V, --version output version information, then exit\n"));
1033  printf(_(" -Z, --compress=METHOD[:LEVEL]\n"
1034  " compress as specified\n"));
1035  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1036  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1037  printf(_(" -?, --help show this help, then exit\n"));
1038 
1039  printf(_("\nOptions controlling the output content:\n"));
1040  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1041  printf(_(" -b, --large-objects, --blobs\n"
1042  " include large objects in dump\n"));
1043  printf(_(" -B, --no-large-objects, --no-blobs\n"
1044  " exclude large objects in dump\n"));
1045  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1046  printf(_(" -C, --create include commands to create database in dump\n"));
1047  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1048  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1049  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1050  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1051  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1052  " plain-text format\n"));
1053  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1054  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1055  printf(_(" -t, --table=PATTERN dump the specified table(s) only\n"));
1056  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1057  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1058  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1059  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1060  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1061  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1062  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1063  " access to)\n"));
1064  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1065  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1066  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1067  printf(_(" --include-foreign-data=PATTERN\n"
1068  " include data of foreign tables on foreign\n"
1069  " servers matching PATTERN\n"));
1070  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1071  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1072  printf(_(" --no-comments do not dump comments\n"));
1073  printf(_(" --no-publications do not dump publications\n"));
1074  printf(_(" --no-security-labels do not dump security label assignments\n"));
1075  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1076  printf(_(" --no-table-access-method do not dump table access methods\n"));
1077  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1078  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1079  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1080  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1081  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1082  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1083  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1084  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1085  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1086  printf(_(" --strict-names require table and/or schema include patterns to\n"
1087  " match at least one entity each\n"));
1088  printf(_(" --use-set-session-authorization\n"
1089  " use SET SESSION AUTHORIZATION commands instead of\n"
1090  " ALTER OWNER commands to set ownership\n"));
1091 
1092  printf(_("\nConnection options:\n"));
1093  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1094  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1095  printf(_(" -p, --port=PORT database server port number\n"));
1096  printf(_(" -U, --username=NAME connect as specified database user\n"));
1097  printf(_(" -w, --no-password never prompt for password\n"));
1098  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1099  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1100 
1101  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1102  "variable value is used.\n\n"));
1103  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1104  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1105 }
1106 
1107 static void
1108 setup_connection(Archive *AH, const char *dumpencoding,
1109  const char *dumpsnapshot, char *use_role)
1110 {
1111  DumpOptions *dopt = AH->dopt;
1112  PGconn *conn = GetConnection(AH);
1113  const char *std_strings;
1114 
1116 
1117  /*
1118  * Set the client encoding if requested.
1119  */
1120  if (dumpencoding)
1121  {
1122  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1123  pg_fatal("invalid client encoding \"%s\" specified",
1124  dumpencoding);
1125  }
1126 
1127  /*
1128  * Get the active encoding and the standard_conforming_strings setting, so
1129  * we know how to escape strings.
1130  */
1132 
1133  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1134  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1135 
1136  /*
1137  * Set the role if requested. In a parallel dump worker, we'll be passed
1138  * use_role == NULL, but AH->use_role is already set (if user specified it
1139  * originally) and we should use that.
1140  */
1141  if (!use_role && AH->use_role)
1142  use_role = AH->use_role;
1143 
1144  /* Set the role if requested */
1145  if (use_role)
1146  {
1147  PQExpBuffer query = createPQExpBuffer();
1148 
1149  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1150  ExecuteSqlStatement(AH, query->data);
1151  destroyPQExpBuffer(query);
1152 
1153  /* save it for possible later use by parallel workers */
1154  if (!AH->use_role)
1155  AH->use_role = pg_strdup(use_role);
1156  }
1157 
1158  /* Set the datestyle to ISO to ensure the dump's portability */
1159  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1160 
1161  /* Likewise, avoid using sql_standard intervalstyle */
1162  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1163 
1164  /*
1165  * Use an explicitly specified extra_float_digits if it has been provided.
1166  * Otherwise, set extra_float_digits so that we can dump float data
1167  * exactly (given correctly implemented float I/O code, anyway).
1168  */
1170  {
1172 
1173  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1175  ExecuteSqlStatement(AH, q->data);
1176  destroyPQExpBuffer(q);
1177  }
1178  else
1179  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1180 
1181  /*
1182  * Disable synchronized scanning, to prevent unpredictable changes in row
1183  * ordering across a dump and reload.
1184  */
1185  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1186 
1187  /*
1188  * Disable timeouts if supported.
1189  */
1190  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1191  if (AH->remoteVersion >= 90300)
1192  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1193  if (AH->remoteVersion >= 90600)
1194  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1195 
1196  /*
1197  * Quote all identifiers, if requested.
1198  */
1200  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1201 
1202  /*
1203  * Adjust row-security mode, if supported.
1204  */
1205  if (AH->remoteVersion >= 90500)
1206  {
1207  if (dopt->enable_row_security)
1208  ExecuteSqlStatement(AH, "SET row_security = on");
1209  else
1210  ExecuteSqlStatement(AH, "SET row_security = off");
1211  }
1212 
1213  /*
1214  * Initialize prepared-query state to "nothing prepared". We do this here
1215  * so that a parallel dump worker will have its own state.
1216  */
1217  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1218 
1219  /*
1220  * Start transaction-snapshot mode transaction to dump consistent data.
1221  */
1222  ExecuteSqlStatement(AH, "BEGIN");
1223 
1224  /*
1225  * To support the combination of serializable_deferrable with the jobs
1226  * option we use REPEATABLE READ for the worker connections that are
1227  * passed a snapshot. As long as the snapshot is acquired in a
1228  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1229  * REPEATABLE READ transaction provides the appropriate integrity
1230  * guarantees. This is a kluge, but safe for back-patching.
1231  */
1232  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1234  "SET TRANSACTION ISOLATION LEVEL "
1235  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1236  else
1238  "SET TRANSACTION ISOLATION LEVEL "
1239  "REPEATABLE READ, READ ONLY");
1240 
1241  /*
1242  * If user specified a snapshot to use, select that. In a parallel dump
1243  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1244  * is already set (if the server can handle it) and we should use that.
1245  */
1246  if (dumpsnapshot)
1247  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1248 
1249  if (AH->sync_snapshot_id)
1250  {
1251  PQExpBuffer query = createPQExpBuffer();
1252 
1253  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1255  ExecuteSqlStatement(AH, query->data);
1256  destroyPQExpBuffer(query);
1257  }
1258  else if (AH->numWorkers > 1)
1259  {
1260  if (AH->isStandby && AH->remoteVersion < 100000)
1261  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1263  }
1264 }
1265 
1266 /* Set up connection for a parallel worker process */
1267 static void
1269 {
1270  /*
1271  * We want to re-select all the same values the leader connection is
1272  * using. We'll have inherited directly-usable values in
1273  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1274  * inherited encoding value back to a string to pass to setup_connection.
1275  */
1276  setup_connection(AH,
1278  NULL,
1279  NULL);
1280 }
1281 
1282 static char *
1284 {
1285  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1286  char *result;
1287  PGresult *res;
1288 
1289  res = ExecuteSqlQueryForSingleRow(fout, query);
1290  result = pg_strdup(PQgetvalue(res, 0, 0));
1291  PQclear(res);
1292 
1293  return result;
1294 }
1295 
1296 static ArchiveFormat
1298 {
1299  ArchiveFormat archiveFormat;
1300 
1301  *mode = archModeWrite;
1302 
1303  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1304  {
1305  /* This is used by pg_dumpall, and is not documented */
1306  archiveFormat = archNull;
1307  *mode = archModeAppend;
1308  }
1309  else if (pg_strcasecmp(format, "c") == 0)
1310  archiveFormat = archCustom;
1311  else if (pg_strcasecmp(format, "custom") == 0)
1312  archiveFormat = archCustom;
1313  else if (pg_strcasecmp(format, "d") == 0)
1314  archiveFormat = archDirectory;
1315  else if (pg_strcasecmp(format, "directory") == 0)
1316  archiveFormat = archDirectory;
1317  else if (pg_strcasecmp(format, "p") == 0)
1318  archiveFormat = archNull;
1319  else if (pg_strcasecmp(format, "plain") == 0)
1320  archiveFormat = archNull;
1321  else if (pg_strcasecmp(format, "t") == 0)
1322  archiveFormat = archTar;
1323  else if (pg_strcasecmp(format, "tar") == 0)
1324  archiveFormat = archTar;
1325  else
1326  pg_fatal("invalid output format \"%s\" specified", format);
1327  return archiveFormat;
1328 }
1329 
1330 /*
1331  * Find the OIDs of all schemas matching the given list of patterns,
1332  * and append them to the given OID list.
1333  */
1334 static void
1336  SimpleStringList *patterns,
1337  SimpleOidList *oids,
1338  bool strict_names)
1339 {
1340  PQExpBuffer query;
1341  PGresult *res;
1342  SimpleStringListCell *cell;
1343  int i;
1344 
1345  if (patterns->head == NULL)
1346  return; /* nothing to do */
1347 
1348  query = createPQExpBuffer();
1349 
1350  /*
1351  * The loop below runs multiple SELECTs might sometimes result in
1352  * duplicate entries in the OID list, but we don't care.
1353  */
1354 
1355  for (cell = patterns->head; cell; cell = cell->next)
1356  {
1357  PQExpBufferData dbbuf;
1358  int dotcnt;
1359 
1360  appendPQExpBufferStr(query,
1361  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1362  initPQExpBuffer(&dbbuf);
1363  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1364  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1365  &dotcnt);
1366  if (dotcnt > 1)
1367  pg_fatal("improper qualified name (too many dotted names): %s",
1368  cell->val);
1369  else if (dotcnt == 1)
1370  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1371  termPQExpBuffer(&dbbuf);
1372 
1373  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1374  if (strict_names && PQntuples(res) == 0)
1375  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1376 
1377  for (i = 0; i < PQntuples(res); i++)
1378  {
1380  }
1381 
1382  PQclear(res);
1383  resetPQExpBuffer(query);
1384  }
1385 
1386  destroyPQExpBuffer(query);
1387 }
1388 
1389 /*
1390  * Find the OIDs of all extensions matching the given list of patterns,
1391  * and append them to the given OID list.
1392  */
1393 static void
1395  SimpleStringList *patterns,
1396  SimpleOidList *oids,
1397  bool strict_names)
1398 {
1399  PQExpBuffer query;
1400  PGresult *res;
1401  SimpleStringListCell *cell;
1402  int i;
1403 
1404  if (patterns->head == NULL)
1405  return; /* nothing to do */
1406 
1407  query = createPQExpBuffer();
1408 
1409  /*
1410  * The loop below runs multiple SELECTs might sometimes result in
1411  * duplicate entries in the OID list, but we don't care.
1412  */
1413  for (cell = patterns->head; cell; cell = cell->next)
1414  {
1415  int dotcnt;
1416 
1417  appendPQExpBufferStr(query,
1418  "SELECT oid FROM pg_catalog.pg_extension e\n");
1419  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1420  false, NULL, "e.extname", NULL, NULL, NULL,
1421  &dotcnt);
1422  if (dotcnt > 0)
1423  pg_fatal("improper qualified name (too many dotted names): %s",
1424  cell->val);
1425 
1426  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1427  if (strict_names && PQntuples(res) == 0)
1428  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1429 
1430  for (i = 0; i < PQntuples(res); i++)
1431  {
1433  }
1434 
1435  PQclear(res);
1436  resetPQExpBuffer(query);
1437  }
1438 
1439  destroyPQExpBuffer(query);
1440 }
1441 
1442 /*
1443  * Find the OIDs of all foreign servers matching the given list of patterns,
1444  * and append them to the given OID list.
1445  */
1446 static void
1448  SimpleStringList *patterns,
1449  SimpleOidList *oids)
1450 {
1451  PQExpBuffer query;
1452  PGresult *res;
1453  SimpleStringListCell *cell;
1454  int i;
1455 
1456  if (patterns->head == NULL)
1457  return; /* nothing to do */
1458 
1459  query = createPQExpBuffer();
1460 
1461  /*
1462  * The loop below runs multiple SELECTs might sometimes result in
1463  * duplicate entries in the OID list, but we don't care.
1464  */
1465 
1466  for (cell = patterns->head; cell; cell = cell->next)
1467  {
1468  int dotcnt;
1469 
1470  appendPQExpBufferStr(query,
1471  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1472  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1473  false, NULL, "s.srvname", NULL, NULL, NULL,
1474  &dotcnt);
1475  if (dotcnt > 0)
1476  pg_fatal("improper qualified name (too many dotted names): %s",
1477  cell->val);
1478 
1479  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1480  if (PQntuples(res) == 0)
1481  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1482 
1483  for (i = 0; i < PQntuples(res); i++)
1485 
1486  PQclear(res);
1487  resetPQExpBuffer(query);
1488  }
1489 
1490  destroyPQExpBuffer(query);
1491 }
1492 
1493 /*
1494  * Find the OIDs of all tables matching the given list of patterns,
1495  * and append them to the given OID list. See also expand_dbname_patterns()
1496  * in pg_dumpall.c
1497  */
1498 static void
1500  SimpleStringList *patterns, SimpleOidList *oids,
1501  bool strict_names)
1502 {
1503  PQExpBuffer query;
1504  PGresult *res;
1505  SimpleStringListCell *cell;
1506  int i;
1507 
1508  if (patterns->head == NULL)
1509  return; /* nothing to do */
1510 
1511  query = createPQExpBuffer();
1512 
1513  /*
1514  * this might sometimes result in duplicate entries in the OID list, but
1515  * we don't care.
1516  */
1517 
1518  for (cell = patterns->head; cell; cell = cell->next)
1519  {
1520  PQExpBufferData dbbuf;
1521  int dotcnt;
1522 
1523  /*
1524  * Query must remain ABSOLUTELY devoid of unqualified names. This
1525  * would be unnecessary given a pg_table_is_visible() variant taking a
1526  * search_path argument.
1527  */
1528  appendPQExpBuffer(query,
1529  "SELECT c.oid"
1530  "\nFROM pg_catalog.pg_class c"
1531  "\n LEFT JOIN pg_catalog.pg_namespace n"
1532  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1533  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1534  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1535  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1536  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1537  RELKIND_PARTITIONED_TABLE);
1538  initPQExpBuffer(&dbbuf);
1539  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1540  false, "n.nspname", "c.relname", NULL,
1541  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1542  &dotcnt);
1543  if (dotcnt > 2)
1544  pg_fatal("improper relation name (too many dotted names): %s",
1545  cell->val);
1546  else if (dotcnt == 2)
1547  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1548  termPQExpBuffer(&dbbuf);
1549 
1550  ExecuteSqlStatement(fout, "RESET search_path");
1551  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1554  if (strict_names && PQntuples(res) == 0)
1555  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1556 
1557  for (i = 0; i < PQntuples(res); i++)
1558  {
1560  }
1561 
1562  PQclear(res);
1563  resetPQExpBuffer(query);
1564  }
1565 
1566  destroyPQExpBuffer(query);
1567 }
1568 
1569 /*
1570  * Verifies that the connected database name matches the given database name,
1571  * and if not, dies with an error about the given pattern.
1572  *
1573  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1574  */
1575 static void
1576 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1577 {
1578  const char *db;
1579 
1580  db = PQdb(conn);
1581  if (db == NULL)
1582  pg_fatal("You are currently not connected to a database.");
1583 
1584  if (strcmp(db, dbname) != 0)
1585  pg_fatal("cross-database references are not implemented: %s",
1586  pattern);
1587 }
1588 
1589 /*
1590  * checkExtensionMembership
1591  * Determine whether object is an extension member, and if so,
1592  * record an appropriate dependency and set the object's dump flag.
1593  *
1594  * It's important to call this for each object that could be an extension
1595  * member. Generally, we integrate this with determining the object's
1596  * to-be-dumped-ness, since extension membership overrides other rules for that.
1597  *
1598  * Returns true if object is an extension member, else false.
1599  */
1600 static bool
1602 {
1603  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1604 
1605  if (ext == NULL)
1606  return false;
1607 
1608  dobj->ext_member = true;
1609 
1610  /* Record dependency so that getDependencies needn't deal with that */
1611  addObjectDependency(dobj, ext->dobj.dumpId);
1612 
1613  /*
1614  * In 9.6 and above, mark the member object to have any non-initial ACL,
1615  * policies, and security labels dumped.
1616  *
1617  * Note that any initial ACLs (see pg_init_privs) will be removed when we
1618  * extract the information about the object. We don't provide support for
1619  * initial policies and security labels and it seems unlikely for those to
1620  * ever exist, but we may have to revisit this later.
1621  *
1622  * Prior to 9.6, we do not include any extension member components.
1623  *
1624  * In binary upgrades, we still dump all components of the members
1625  * individually, since the idea is to exactly reproduce the database
1626  * contents rather than replace the extension contents with something
1627  * different.
1628  */
1629  if (fout->dopt->binary_upgrade)
1630  dobj->dump = ext->dobj.dump;
1631  else
1632  {
1633  if (fout->remoteVersion < 90600)
1634  dobj->dump = DUMP_COMPONENT_NONE;
1635  else
1636  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL |
1639  }
1640 
1641  return true;
1642 }
1643 
1644 /*
1645  * selectDumpableNamespace: policy-setting subroutine
1646  * Mark a namespace as to be dumped or not
1647  */
1648 static void
1650 {
1651  /*
1652  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1653  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1654  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1655  */
1656  nsinfo->create = true;
1657 
1658  /*
1659  * If specific tables are being dumped, do not dump any complete
1660  * namespaces. If specific namespaces are being dumped, dump just those
1661  * namespaces. Otherwise, dump all non-system namespaces.
1662  */
1663  if (table_include_oids.head != NULL)
1664  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1665  else if (schema_include_oids.head != NULL)
1666  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1668  nsinfo->dobj.catId.oid) ?
1670  else if (fout->remoteVersion >= 90600 &&
1671  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1672  {
1673  /*
1674  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1675  * they are interesting (and not the original ACLs which were set at
1676  * initdb time, see pg_init_privs).
1677  */
1678  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1679  }
1680  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1681  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1682  {
1683  /* Other system schemas don't get dumped */
1684  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1685  }
1686  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1687  {
1688  /*
1689  * The public schema is a strange beast that sits in a sort of
1690  * no-mans-land between being a system object and a user object.
1691  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1692  * a comment and an indication of ownership. If the owner is the
1693  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1694  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1695  */
1696  nsinfo->create = false;
1697  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1698  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1699  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1701 
1702  /*
1703  * Also, make like it has a comment even if it doesn't; this is so
1704  * that we'll emit a command to drop the comment, if appropriate.
1705  * (Without this, we'd not call dumpCommentExtended for it.)
1706  */
1708  }
1709  else
1710  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1711 
1712  /*
1713  * In any case, a namespace can be excluded by an exclusion switch
1714  */
1715  if (nsinfo->dobj.dump_contains &&
1717  nsinfo->dobj.catId.oid))
1718  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1719 
1720  /*
1721  * If the schema belongs to an extension, allow extension membership to
1722  * override the dump decision for the schema itself. However, this does
1723  * not change dump_contains, so this won't change what we do with objects
1724  * within the schema. (If they belong to the extension, they'll get
1725  * suppressed by it, otherwise not.)
1726  */
1727  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1728 }
1729 
1730 /*
1731  * selectDumpableTable: policy-setting subroutine
1732  * Mark a table as to be dumped or not
1733  */
1734 static void
1736 {
1737  if (checkExtensionMembership(&tbinfo->dobj, fout))
1738  return; /* extension membership overrides all else */
1739 
1740  /*
1741  * If specific tables are being dumped, dump just those tables; else, dump
1742  * according to the parent namespace's dump flag.
1743  */
1744  if (table_include_oids.head != NULL)
1746  tbinfo->dobj.catId.oid) ?
1748  else
1749  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1750 
1751  /*
1752  * In any case, a table can be excluded by an exclusion switch
1753  */
1754  if (tbinfo->dobj.dump &&
1756  tbinfo->dobj.catId.oid))
1757  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1758 }
1759 
1760 /*
1761  * selectDumpableType: policy-setting subroutine
1762  * Mark a type as to be dumped or not
1763  *
1764  * If it's a table's rowtype or an autogenerated array type, we also apply a
1765  * special type code to facilitate sorting into the desired order. (We don't
1766  * want to consider those to be ordinary types because that would bring tables
1767  * up into the datatype part of the dump order.) We still set the object's
1768  * dump flag; that's not going to cause the dummy type to be dumped, but we
1769  * need it so that casts involving such types will be dumped correctly -- see
1770  * dumpCast. This means the flag should be set the same as for the underlying
1771  * object (the table or base type).
1772  */
1773 static void
1775 {
1776  /* skip complex types, except for standalone composite types */
1777  if (OidIsValid(tyinfo->typrelid) &&
1778  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1779  {
1780  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1781 
1782  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1783  if (tytable != NULL)
1784  tyinfo->dobj.dump = tytable->dobj.dump;
1785  else
1786  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1787  return;
1788  }
1789 
1790  /* skip auto-generated array types */
1791  if (tyinfo->isArray || tyinfo->isMultirange)
1792  {
1793  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1794 
1795  /*
1796  * Fall through to set the dump flag; we assume that the subsequent
1797  * rules will do the same thing as they would for the array's base
1798  * type. (We cannot reliably look up the base type here, since
1799  * getTypes may not have processed it yet.)
1800  */
1801  }
1802 
1803  if (checkExtensionMembership(&tyinfo->dobj, fout))
1804  return; /* extension membership overrides all else */
1805 
1806  /* Dump based on if the contents of the namespace are being dumped */
1807  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1808 }
1809 
1810 /*
1811  * selectDumpableDefaultACL: policy-setting subroutine
1812  * Mark a default ACL as to be dumped or not
1813  *
1814  * For per-schema default ACLs, dump if the schema is to be dumped.
1815  * Otherwise dump if we are dumping "everything". Note that dataOnly
1816  * and aclsSkip are checked separately.
1817  */
1818 static void
1820 {
1821  /* Default ACLs can't be extension members */
1822 
1823  if (dinfo->dobj.namespace)
1824  /* default ACLs are considered part of the namespace */
1825  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1826  else
1827  dinfo->dobj.dump = dopt->include_everything ?
1829 }
1830 
1831 /*
1832  * selectDumpableCast: policy-setting subroutine
1833  * Mark a cast as to be dumped or not
1834  *
1835  * Casts do not belong to any particular namespace (since they haven't got
1836  * names), nor do they have identifiable owners. To distinguish user-defined
1837  * casts from built-in ones, we must resort to checking whether the cast's
1838  * OID is in the range reserved for initdb.
1839  */
1840 static void
1842 {
1843  if (checkExtensionMembership(&cast->dobj, fout))
1844  return; /* extension membership overrides all else */
1845 
1846  /*
1847  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
1848  * support ACLs currently.
1849  */
1850  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1851  cast->dobj.dump = DUMP_COMPONENT_NONE;
1852  else
1853  cast->dobj.dump = fout->dopt->include_everything ?
1855 }
1856 
1857 /*
1858  * selectDumpableProcLang: policy-setting subroutine
1859  * Mark a procedural language as to be dumped or not
1860  *
1861  * Procedural languages do not belong to any particular namespace. To
1862  * identify built-in languages, we must resort to checking whether the
1863  * language's OID is in the range reserved for initdb.
1864  */
1865 static void
1867 {
1868  if (checkExtensionMembership(&plang->dobj, fout))
1869  return; /* extension membership overrides all else */
1870 
1871  /*
1872  * Only include procedural languages when we are dumping everything.
1873  *
1874  * For from-initdb procedural languages, only include ACLs, as we do for
1875  * the pg_catalog namespace. We need this because procedural languages do
1876  * not live in any namespace.
1877  */
1878  if (!fout->dopt->include_everything)
1879  plang->dobj.dump = DUMP_COMPONENT_NONE;
1880  else
1881  {
1882  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1883  plang->dobj.dump = fout->remoteVersion < 90600 ?
1885  else
1886  plang->dobj.dump = DUMP_COMPONENT_ALL;
1887  }
1888 }
1889 
1890 /*
1891  * selectDumpableAccessMethod: policy-setting subroutine
1892  * Mark an access method as to be dumped or not
1893  *
1894  * Access methods do not belong to any particular namespace. To identify
1895  * built-in access methods, we must resort to checking whether the
1896  * method's OID is in the range reserved for initdb.
1897  */
1898 static void
1900 {
1901  if (checkExtensionMembership(&method->dobj, fout))
1902  return; /* extension membership overrides all else */
1903 
1904  /*
1905  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
1906  * they do not support ACLs currently.
1907  */
1908  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1909  method->dobj.dump = DUMP_COMPONENT_NONE;
1910  else
1911  method->dobj.dump = fout->dopt->include_everything ?
1913 }
1914 
1915 /*
1916  * selectDumpableExtension: policy-setting subroutine
1917  * Mark an extension as to be dumped or not
1918  *
1919  * Built-in extensions should be skipped except for checking ACLs, since we
1920  * assume those will already be installed in the target database. We identify
1921  * such extensions by their having OIDs in the range reserved for initdb.
1922  * We dump all user-added extensions by default. No extensions are dumped
1923  * if include_everything is false (i.e., a --schema or --table switch was
1924  * given), except if --extension specifies a list of extensions to dump.
1925  */
1926 static void
1928 {
1929  /*
1930  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
1931  * change permissions on their member objects, if they wish to, and have
1932  * those changes preserved.
1933  */
1934  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
1935  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
1936  else
1937  {
1938  /* check if there is a list of extensions to dump */
1939  if (extension_include_oids.head != NULL)
1940  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1942  extinfo->dobj.catId.oid) ?
1944  else
1945  extinfo->dobj.dump = extinfo->dobj.dump_contains =
1946  dopt->include_everything ?
1948  }
1949 }
1950 
1951 /*
1952  * selectDumpablePublicationObject: policy-setting subroutine
1953  * Mark a publication object as to be dumped or not
1954  *
1955  * A publication can have schemas and tables which have schemas, but those are
1956  * ignored in decision making, because publications are only dumped when we are
1957  * dumping everything.
1958  */
1959 static void
1961 {
1962  if (checkExtensionMembership(dobj, fout))
1963  return; /* extension membership overrides all else */
1964 
1965  dobj->dump = fout->dopt->include_everything ?
1967 }
1968 
1969 /*
1970  * selectDumpableObject: policy-setting subroutine
1971  * Mark a generic dumpable object as to be dumped or not
1972  *
1973  * Use this only for object types without a special-case routine above.
1974  */
1975 static void
1977 {
1978  if (checkExtensionMembership(dobj, fout))
1979  return; /* extension membership overrides all else */
1980 
1981  /*
1982  * Default policy is to dump if parent namespace is dumpable, or for
1983  * non-namespace-associated items, dump if we're dumping "everything".
1984  */
1985  if (dobj->namespace)
1986  dobj->dump = dobj->namespace->dobj.dump_contains;
1987  else
1988  dobj->dump = fout->dopt->include_everything ?
1990 }
1991 
1992 /*
1993  * Dump a table's contents for loading using the COPY command
1994  * - this routine is called by the Archiver when it wants the table
1995  * to be dumped.
1996  */
1997 static int
1998 dumpTableData_copy(Archive *fout, const void *dcontext)
1999 {
2000  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2001  TableInfo *tbinfo = tdinfo->tdtable;
2002  const char *classname = tbinfo->dobj.name;
2004 
2005  /*
2006  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2007  * which uses it already.
2008  */
2009  PQExpBuffer clistBuf = createPQExpBuffer();
2010  PGconn *conn = GetConnection(fout);
2011  PGresult *res;
2012  int ret;
2013  char *copybuf;
2014  const char *column_list;
2015 
2016  pg_log_info("dumping contents of table \"%s.%s\"",
2017  tbinfo->dobj.namespace->dobj.name, classname);
2018 
2019  /*
2020  * Specify the column list explicitly so that we have no possibility of
2021  * retrieving data in the wrong column order. (The default column
2022  * ordering of COPY will not be what we want in certain corner cases
2023  * involving ADD COLUMN and inheritance.)
2024  */
2025  column_list = fmtCopyColumnList(tbinfo, clistBuf);
2026 
2027  /*
2028  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2029  * a filter condition was specified. For other cases a simple COPY
2030  * suffices.
2031  */
2032  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2033  {
2034  appendPQExpBufferStr(q, "COPY (SELECT ");
2035  /* klugery to get rid of parens in column list */
2036  if (strlen(column_list) > 2)
2037  {
2038  appendPQExpBufferStr(q, column_list + 1);
2039  q->data[q->len - 1] = ' ';
2040  }
2041  else
2042  appendPQExpBufferStr(q, "* ");
2043 
2044  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2045  fmtQualifiedDumpable(tbinfo),
2046  tdinfo->filtercond ? tdinfo->filtercond : "");
2047  }
2048  else
2049  {
2050  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2051  fmtQualifiedDumpable(tbinfo),
2052  column_list);
2053  }
2054  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2055  PQclear(res);
2056  destroyPQExpBuffer(clistBuf);
2057 
2058  for (;;)
2059  {
2060  ret = PQgetCopyData(conn, &copybuf, 0);
2061 
2062  if (ret < 0)
2063  break; /* done or error */
2064 
2065  if (copybuf)
2066  {
2067  WriteData(fout, copybuf, ret);
2068  PQfreemem(copybuf);
2069  }
2070 
2071  /* ----------
2072  * THROTTLE:
2073  *
2074  * There was considerable discussion in late July, 2000 regarding
2075  * slowing down pg_dump when backing up large tables. Users with both
2076  * slow & fast (multi-processor) machines experienced performance
2077  * degradation when doing a backup.
2078  *
2079  * Initial attempts based on sleeping for a number of ms for each ms
2080  * of work were deemed too complex, then a simple 'sleep in each loop'
2081  * implementation was suggested. The latter failed because the loop
2082  * was too tight. Finally, the following was implemented:
2083  *
2084  * If throttle is non-zero, then
2085  * See how long since the last sleep.
2086  * Work out how long to sleep (based on ratio).
2087  * If sleep is more than 100ms, then
2088  * sleep
2089  * reset timer
2090  * EndIf
2091  * EndIf
2092  *
2093  * where the throttle value was the number of ms to sleep per ms of
2094  * work. The calculation was done in each loop.
2095  *
2096  * Most of the hard work is done in the backend, and this solution
2097  * still did not work particularly well: on slow machines, the ratio
2098  * was 50:1, and on medium paced machines, 1:1, and on fast
2099  * multi-processor machines, it had little or no effect, for reasons
2100  * that were unclear.
2101  *
2102  * Further discussion ensued, and the proposal was dropped.
2103  *
2104  * For those people who want this feature, it can be implemented using
2105  * gettimeofday in each loop, calculating the time since last sleep,
2106  * multiplying that by the sleep ratio, then if the result is more
2107  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2108  * function to sleep for a subsecond period ie.
2109  *
2110  * select(0, NULL, NULL, NULL, &tvi);
2111  *
2112  * This will return after the interval specified in the structure tvi.
2113  * Finally, call gettimeofday again to save the 'last sleep time'.
2114  * ----------
2115  */
2116  }
2117  archprintf(fout, "\\.\n\n\n");
2118 
2119  if (ret == -2)
2120  {
2121  /* copy data transfer failed */
2122  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2123  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2124  pg_log_error_detail("Command was: %s", q->data);
2125  exit_nicely(1);
2126  }
2127 
2128  /* Check command status and return to normal libpq state */
2129  res = PQgetResult(conn);
2131  {
2132  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2133  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2134  pg_log_error_detail("Command was: %s", q->data);
2135  exit_nicely(1);
2136  }
2137  PQclear(res);
2138 
2139  /* Do this to ensure we've pumped libpq back to idle state */
2140  if (PQgetResult(conn) != NULL)
2141  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2142  classname);
2143 
2144  destroyPQExpBuffer(q);
2145  return 1;
2146 }
2147 
2148 /*
2149  * Dump table data using INSERT commands.
2150  *
2151  * Caution: when we restore from an archive file direct to database, the
2152  * INSERT commands emitted by this function have to be parsed by
2153  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2154  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2155  */
2156 static int
2157 dumpTableData_insert(Archive *fout, const void *dcontext)
2158 {
2159  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2160  TableInfo *tbinfo = tdinfo->tdtable;
2161  DumpOptions *dopt = fout->dopt;
2163  PQExpBuffer insertStmt = NULL;
2164  char *attgenerated;
2165  PGresult *res;
2166  int nfields,
2167  i;
2168  int rows_per_statement = dopt->dump_inserts;
2169  int rows_this_statement = 0;
2170 
2171  /*
2172  * If we're going to emit INSERTs with column names, the most efficient
2173  * way to deal with generated columns is to exclude them entirely. For
2174  * INSERTs without column names, we have to emit DEFAULT rather than the
2175  * actual column value --- but we can save a few cycles by fetching nulls
2176  * rather than the uninteresting-to-us value.
2177  */
2178  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2179  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2180  nfields = 0;
2181  for (i = 0; i < tbinfo->numatts; i++)
2182  {
2183  if (tbinfo->attisdropped[i])
2184  continue;
2185  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2186  continue;
2187  if (nfields > 0)
2188  appendPQExpBufferStr(q, ", ");
2189  if (tbinfo->attgenerated[i])
2190  appendPQExpBufferStr(q, "NULL");
2191  else
2192  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2193  attgenerated[nfields] = tbinfo->attgenerated[i];
2194  nfields++;
2195  }
2196  /* Servers before 9.4 will complain about zero-column SELECT */
2197  if (nfields == 0)
2198  appendPQExpBufferStr(q, "NULL");
2199  appendPQExpBuffer(q, " FROM ONLY %s",
2200  fmtQualifiedDumpable(tbinfo));
2201  if (tdinfo->filtercond)
2202  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2203 
2204  ExecuteSqlStatement(fout, q->data);
2205 
2206  while (1)
2207  {
2208  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2209  PGRES_TUPLES_OK);
2210 
2211  /* cross-check field count, allowing for dummy NULL if any */
2212  if (nfields != PQnfields(res) &&
2213  !(nfields == 0 && PQnfields(res) == 1))
2214  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2215  tbinfo->dobj.name);
2216 
2217  /*
2218  * First time through, we build as much of the INSERT statement as
2219  * possible in "insertStmt", which we can then just print for each
2220  * statement. If the table happens to have zero dumpable columns then
2221  * this will be a complete statement, otherwise it will end in
2222  * "VALUES" and be ready to have the row's column values printed.
2223  */
2224  if (insertStmt == NULL)
2225  {
2226  TableInfo *targettab;
2227 
2228  insertStmt = createPQExpBuffer();
2229 
2230  /*
2231  * When load-via-partition-root is set, get the root table name
2232  * for the partition table, so that we can reload data through the
2233  * root table.
2234  */
2235  if (dopt->load_via_partition_root && tbinfo->ispartition)
2236  targettab = getRootTableInfo(tbinfo);
2237  else
2238  targettab = tbinfo;
2239 
2240  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2241  fmtQualifiedDumpable(targettab));
2242 
2243  /* corner case for zero-column table */
2244  if (nfields == 0)
2245  {
2246  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2247  }
2248  else
2249  {
2250  /* append the list of column names if required */
2251  if (dopt->column_inserts)
2252  {
2253  appendPQExpBufferChar(insertStmt, '(');
2254  for (int field = 0; field < nfields; field++)
2255  {
2256  if (field > 0)
2257  appendPQExpBufferStr(insertStmt, ", ");
2258  appendPQExpBufferStr(insertStmt,
2259  fmtId(PQfname(res, field)));
2260  }
2261  appendPQExpBufferStr(insertStmt, ") ");
2262  }
2263 
2264  if (tbinfo->needs_override)
2265  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2266 
2267  appendPQExpBufferStr(insertStmt, "VALUES");
2268  }
2269  }
2270 
2271  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2272  {
2273  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2274  if (rows_this_statement == 0)
2275  archputs(insertStmt->data, fout);
2276 
2277  /*
2278  * If it is zero-column table then we've already written the
2279  * complete statement, which will mean we've disobeyed
2280  * --rows-per-insert when it's set greater than 1. We do support
2281  * a way to make this multi-row with: SELECT UNION ALL SELECT
2282  * UNION ALL ... but that's non-standard so we should avoid it
2283  * given that using INSERTs is mostly only ever needed for
2284  * cross-database exports.
2285  */
2286  if (nfields == 0)
2287  continue;
2288 
2289  /* Emit a row heading */
2290  if (rows_per_statement == 1)
2291  archputs(" (", fout);
2292  else if (rows_this_statement > 0)
2293  archputs(",\n\t(", fout);
2294  else
2295  archputs("\n\t(", fout);
2296 
2297  for (int field = 0; field < nfields; field++)
2298  {
2299  if (field > 0)
2300  archputs(", ", fout);
2301  if (attgenerated[field])
2302  {
2303  archputs("DEFAULT", fout);
2304  continue;
2305  }
2306  if (PQgetisnull(res, tuple, field))
2307  {
2308  archputs("NULL", fout);
2309  continue;
2310  }
2311 
2312  /* XXX This code is partially duplicated in ruleutils.c */
2313  switch (PQftype(res, field))
2314  {
2315  case INT2OID:
2316  case INT4OID:
2317  case INT8OID:
2318  case OIDOID:
2319  case FLOAT4OID:
2320  case FLOAT8OID:
2321  case NUMERICOID:
2322  {
2323  /*
2324  * These types are printed without quotes unless
2325  * they contain values that aren't accepted by the
2326  * scanner unquoted (e.g., 'NaN'). Note that
2327  * strtod() and friends might accept NaN, so we
2328  * can't use that to test.
2329  *
2330  * In reality we only need to defend against
2331  * infinity and NaN, so we need not get too crazy
2332  * about pattern matching here.
2333  */
2334  const char *s = PQgetvalue(res, tuple, field);
2335 
2336  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2337  archputs(s, fout);
2338  else
2339  archprintf(fout, "'%s'", s);
2340  }
2341  break;
2342 
2343  case BITOID:
2344  case VARBITOID:
2345  archprintf(fout, "B'%s'",
2346  PQgetvalue(res, tuple, field));
2347  break;
2348 
2349  case BOOLOID:
2350  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2351  archputs("true", fout);
2352  else
2353  archputs("false", fout);
2354  break;
2355 
2356  default:
2357  /* All other types are printed as string literals. */
2358  resetPQExpBuffer(q);
2360  PQgetvalue(res, tuple, field),
2361  fout);
2362  archputs(q->data, fout);
2363  break;
2364  }
2365  }
2366 
2367  /* Terminate the row ... */
2368  archputs(")", fout);
2369 
2370  /* ... and the statement, if the target no. of rows is reached */
2371  if (++rows_this_statement >= rows_per_statement)
2372  {
2373  if (dopt->do_nothing)
2374  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2375  else
2376  archputs(";\n", fout);
2377  /* Reset the row counter */
2378  rows_this_statement = 0;
2379  }
2380  }
2381 
2382  if (PQntuples(res) <= 0)
2383  {
2384  PQclear(res);
2385  break;
2386  }
2387  PQclear(res);
2388  }
2389 
2390  /* Terminate any statements that didn't make the row count. */
2391  if (rows_this_statement > 0)
2392  {
2393  if (dopt->do_nothing)
2394  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2395  else
2396  archputs(";\n", fout);
2397  }
2398 
2399  archputs("\n\n", fout);
2400 
2401  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2402 
2403  destroyPQExpBuffer(q);
2404  if (insertStmt != NULL)
2405  destroyPQExpBuffer(insertStmt);
2406  free(attgenerated);
2407 
2408  return 1;
2409 }
2410 
2411 /*
2412  * getRootTableInfo:
2413  * get the root TableInfo for the given partition table.
2414  */
2415 static TableInfo *
2417 {
2418  TableInfo *parentTbinfo;
2419 
2420  Assert(tbinfo->ispartition);
2421  Assert(tbinfo->numParents == 1);
2422 
2423  parentTbinfo = tbinfo->parents[0];
2424  while (parentTbinfo->ispartition)
2425  {
2426  Assert(parentTbinfo->numParents == 1);
2427  parentTbinfo = parentTbinfo->parents[0];
2428  }
2429 
2430  return parentTbinfo;
2431 }
2432 
2433 /*
2434  * dumpTableData -
2435  * dump the contents of a single table
2436  *
2437  * Actually, this just makes an ArchiveEntry for the table contents.
2438  */
2439 static void
2440 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2441 {
2442  DumpOptions *dopt = fout->dopt;
2443  TableInfo *tbinfo = tdinfo->tdtable;
2444  PQExpBuffer copyBuf = createPQExpBuffer();
2445  PQExpBuffer clistBuf = createPQExpBuffer();
2446  DataDumperPtr dumpFn;
2447  char *copyStmt;
2448  const char *copyFrom;
2449 
2450  /* We had better have loaded per-column details about this table */
2451  Assert(tbinfo->interesting);
2452 
2453  if (dopt->dump_inserts == 0)
2454  {
2455  /* Dump/restore using COPY */
2456  dumpFn = dumpTableData_copy;
2457 
2458  /*
2459  * When load-via-partition-root is set, get the root table name for
2460  * the partition table, so that we can reload data through the root
2461  * table.
2462  */
2463  if (dopt->load_via_partition_root && tbinfo->ispartition)
2464  {
2465  TableInfo *parentTbinfo;
2466 
2467  parentTbinfo = getRootTableInfo(tbinfo);
2468  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2469  }
2470  else
2471  copyFrom = fmtQualifiedDumpable(tbinfo);
2472 
2473  /* must use 2 steps here 'cause fmtId is nonreentrant */
2474  appendPQExpBuffer(copyBuf, "COPY %s ",
2475  copyFrom);
2476  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2477  fmtCopyColumnList(tbinfo, clistBuf));
2478  copyStmt = copyBuf->data;
2479  }
2480  else
2481  {
2482  /* Restore using INSERT */
2483  dumpFn = dumpTableData_insert;
2484  copyStmt = NULL;
2485  }
2486 
2487  /*
2488  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2489  * dependency on its table as "special" and pass it to ArchiveEntry now.
2490  * See comments for BuildArchiveDependencies.
2491  */
2492  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2493  {
2494  TocEntry *te;
2495 
2496  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2497  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2498  .namespace = tbinfo->dobj.namespace->dobj.name,
2499  .owner = tbinfo->rolname,
2500  .description = "TABLE DATA",
2501  .section = SECTION_DATA,
2502  .copyStmt = copyStmt,
2503  .deps = &(tbinfo->dobj.dumpId),
2504  .nDeps = 1,
2505  .dumpFn = dumpFn,
2506  .dumpArg = tdinfo));
2507 
2508  /*
2509  * Set the TocEntry's dataLength in case we are doing a parallel dump
2510  * and want to order dump jobs by table size. We choose to measure
2511  * dataLength in table pages (including TOAST pages) during dump, so
2512  * no scaling is needed.
2513  *
2514  * However, relpages is declared as "integer" in pg_class, and hence
2515  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2516  * Cast so that we get the right interpretation of table sizes
2517  * exceeding INT_MAX pages.
2518  */
2519  te->dataLength = (BlockNumber) tbinfo->relpages;
2520  te->dataLength += (BlockNumber) tbinfo->toastpages;
2521 
2522  /*
2523  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2524  * and instead we'd better worry about integer overflow. Clamp to
2525  * INT_MAX if the correct result exceeds that.
2526  */
2527  if (sizeof(te->dataLength) == 4 &&
2528  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2529  te->dataLength < 0))
2530  te->dataLength = INT_MAX;
2531  }
2532 
2533  destroyPQExpBuffer(copyBuf);
2534  destroyPQExpBuffer(clistBuf);
2535 }
2536 
2537 /*
2538  * refreshMatViewData -
2539  * load or refresh the contents of a single materialized view
2540  *
2541  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2542  * statement.
2543  */
2544 static void
2546 {
2547  TableInfo *tbinfo = tdinfo->tdtable;
2548  PQExpBuffer q;
2549 
2550  /* If the materialized view is not flagged as populated, skip this. */
2551  if (!tbinfo->relispopulated)
2552  return;
2553 
2554  q = createPQExpBuffer();
2555 
2556  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2557  fmtQualifiedDumpable(tbinfo));
2558 
2559  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2560  ArchiveEntry(fout,
2561  tdinfo->dobj.catId, /* catalog ID */
2562  tdinfo->dobj.dumpId, /* dump ID */
2563  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2564  .namespace = tbinfo->dobj.namespace->dobj.name,
2565  .owner = tbinfo->rolname,
2566  .description = "MATERIALIZED VIEW DATA",
2567  .section = SECTION_POST_DATA,
2568  .createStmt = q->data,
2569  .deps = tdinfo->dobj.dependencies,
2570  .nDeps = tdinfo->dobj.nDeps));
2571 
2572  destroyPQExpBuffer(q);
2573 }
2574 
2575 /*
2576  * getTableData -
2577  * set up dumpable objects representing the contents of tables
2578  */
2579 static void
2580 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2581 {
2582  int i;
2583 
2584  for (i = 0; i < numTables; i++)
2585  {
2586  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2587  (!relkind || tblinfo[i].relkind == relkind))
2588  makeTableDataInfo(dopt, &(tblinfo[i]));
2589  }
2590 }
2591 
2592 /*
2593  * Make a dumpable object for the data of this specific table
2594  *
2595  * Note: we make a TableDataInfo if and only if we are going to dump the
2596  * table data; the "dump" field in such objects isn't very interesting.
2597  */
2598 static void
2600 {
2601  TableDataInfo *tdinfo;
2602 
2603  /*
2604  * Nothing to do if we already decided to dump the table. This will
2605  * happen for "config" tables.
2606  */
2607  if (tbinfo->dataObj != NULL)
2608  return;
2609 
2610  /* Skip VIEWs (no data to dump) */
2611  if (tbinfo->relkind == RELKIND_VIEW)
2612  return;
2613  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2614  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2617  tbinfo->foreign_server)))
2618  return;
2619  /* Skip partitioned tables (data in partitions) */
2620  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2621  return;
2622 
2623  /* Don't dump data in unlogged tables, if so requested */
2624  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2625  dopt->no_unlogged_table_data)
2626  return;
2627 
2628  /* Check that the data is not explicitly excluded */
2630  tbinfo->dobj.catId.oid))
2631  return;
2632 
2633  /* OK, let's dump it */
2634  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2635 
2636  if (tbinfo->relkind == RELKIND_MATVIEW)
2637  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2638  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2639  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2640  else
2641  tdinfo->dobj.objType = DO_TABLE_DATA;
2642 
2643  /*
2644  * Note: use tableoid 0 so that this object won't be mistaken for
2645  * something that pg_depend entries apply to.
2646  */
2647  tdinfo->dobj.catId.tableoid = 0;
2648  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2649  AssignDumpId(&tdinfo->dobj);
2650  tdinfo->dobj.name = tbinfo->dobj.name;
2651  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2652  tdinfo->tdtable = tbinfo;
2653  tdinfo->filtercond = NULL; /* might get set later */
2654  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2655 
2656  /* A TableDataInfo contains data, of course */
2657  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2658 
2659  tbinfo->dataObj = tdinfo;
2660 
2661  /* Make sure that we'll collect per-column info for this table. */
2662  tbinfo->interesting = true;
2663 }
2664 
2665 /*
2666  * The refresh for a materialized view must be dependent on the refresh for
2667  * any materialized view that this one is dependent on.
2668  *
2669  * This must be called after all the objects are created, but before they are
2670  * sorted.
2671  */
2672 static void
2674 {
2675  PQExpBuffer query;
2676  PGresult *res;
2677  int ntups,
2678  i;
2679  int i_classid,
2680  i_objid,
2681  i_refobjid;
2682 
2683  /* No Mat Views before 9.3. */
2684  if (fout->remoteVersion < 90300)
2685  return;
2686 
2687  query = createPQExpBuffer();
2688 
2689  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2690  "( "
2691  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2692  "FROM pg_depend d1 "
2693  "JOIN pg_class c1 ON c1.oid = d1.objid "
2694  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2695  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2696  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2697  "AND d2.objid = r1.oid "
2698  "AND d2.refobjid <> d1.objid "
2699  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2700  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2701  CppAsString2(RELKIND_VIEW) ") "
2702  "WHERE d1.classid = 'pg_class'::regclass "
2703  "UNION "
2704  "SELECT w.objid, d3.refobjid, c3.relkind "
2705  "FROM w "
2706  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2707  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2708  "AND d3.objid = r3.oid "
2709  "AND d3.refobjid <> w.refobjid "
2710  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2711  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2712  CppAsString2(RELKIND_VIEW) ") "
2713  ") "
2714  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2715  "FROM w "
2716  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2717 
2718  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2719 
2720  ntups = PQntuples(res);
2721 
2722  i_classid = PQfnumber(res, "classid");
2723  i_objid = PQfnumber(res, "objid");
2724  i_refobjid = PQfnumber(res, "refobjid");
2725 
2726  for (i = 0; i < ntups; i++)
2727  {
2728  CatalogId objId;
2729  CatalogId refobjId;
2730  DumpableObject *dobj;
2731  DumpableObject *refdobj;
2732  TableInfo *tbinfo;
2733  TableInfo *reftbinfo;
2734 
2735  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2736  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2737  refobjId.tableoid = objId.tableoid;
2738  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2739 
2740  dobj = findObjectByCatalogId(objId);
2741  if (dobj == NULL)
2742  continue;
2743 
2744  Assert(dobj->objType == DO_TABLE);
2745  tbinfo = (TableInfo *) dobj;
2746  Assert(tbinfo->relkind == RELKIND_MATVIEW);
2747  dobj = (DumpableObject *) tbinfo->dataObj;
2748  if (dobj == NULL)
2749  continue;
2750  Assert(dobj->objType == DO_REFRESH_MATVIEW);
2751 
2752  refdobj = findObjectByCatalogId(refobjId);
2753  if (refdobj == NULL)
2754  continue;
2755 
2756  Assert(refdobj->objType == DO_TABLE);
2757  reftbinfo = (TableInfo *) refdobj;
2758  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
2759  refdobj = (DumpableObject *) reftbinfo->dataObj;
2760  if (refdobj == NULL)
2761  continue;
2762  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
2763 
2764  addObjectDependency(dobj, refdobj->dumpId);
2765 
2766  if (!reftbinfo->relispopulated)
2767  tbinfo->relispopulated = false;
2768  }
2769 
2770  PQclear(res);
2771 
2772  destroyPQExpBuffer(query);
2773 }
2774 
2775 /*
2776  * getTableDataFKConstraints -
2777  * add dump-order dependencies reflecting foreign key constraints
2778  *
2779  * This code is executed only in a data-only dump --- in schema+data dumps
2780  * we handle foreign key issues by not creating the FK constraints until
2781  * after the data is loaded. In a data-only dump, however, we want to
2782  * order the table data objects in such a way that a table's referenced
2783  * tables are restored first. (In the presence of circular references or
2784  * self-references this may be impossible; we'll detect and complain about
2785  * that during the dependency sorting step.)
2786  */
2787 static void
2789 {
2790  DumpableObject **dobjs;
2791  int numObjs;
2792  int i;
2793 
2794  /* Search through all the dumpable objects for FK constraints */
2795  getDumpableObjects(&dobjs, &numObjs);
2796  for (i = 0; i < numObjs; i++)
2797  {
2798  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
2799  {
2800  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
2801  TableInfo *ftable;
2802 
2803  /* Not interesting unless both tables are to be dumped */
2804  if (cinfo->contable == NULL ||
2805  cinfo->contable->dataObj == NULL)
2806  continue;
2807  ftable = findTableByOid(cinfo->confrelid);
2808  if (ftable == NULL ||
2809  ftable->dataObj == NULL)
2810  continue;
2811 
2812  /*
2813  * Okay, make referencing table's TABLE_DATA object depend on the
2814  * referenced table's TABLE_DATA object.
2815  */
2817  ftable->dataObj->dobj.dumpId);
2818  }
2819  }
2820  free(dobjs);
2821 }
2822 
2823 
2824 /*
2825  * dumpDatabase:
2826  * dump the database definition
2827  */
2828 static void
2830 {
2831  DumpOptions *dopt = fout->dopt;
2832  PQExpBuffer dbQry = createPQExpBuffer();
2833  PQExpBuffer delQry = createPQExpBuffer();
2834  PQExpBuffer creaQry = createPQExpBuffer();
2835  PQExpBuffer labelq = createPQExpBuffer();
2836  PGconn *conn = GetConnection(fout);
2837  PGresult *res;
2838  int i_tableoid,
2839  i_oid,
2840  i_datname,
2841  i_datdba,
2842  i_encoding,
2843  i_datlocprovider,
2844  i_collate,
2845  i_ctype,
2846  i_daticulocale,
2847  i_frozenxid,
2848  i_minmxid,
2849  i_datacl,
2850  i_acldefault,
2851  i_datistemplate,
2852  i_datconnlimit,
2853  i_datcollversion,
2854  i_tablespace;
2855  CatalogId dbCatId;
2856  DumpId dbDumpId;
2857  DumpableAcl dbdacl;
2858  const char *datname,
2859  *dba,
2860  *encoding,
2861  *datlocprovider,
2862  *collate,
2863  *ctype,
2864  *iculocale,
2865  *datistemplate,
2866  *datconnlimit,
2867  *tablespace;
2868  uint32 frozenxid,
2869  minmxid;
2870  char *qdatname;
2871 
2872  pg_log_info("saving database definition");
2873 
2874  /*
2875  * Fetch the database-level properties for this database.
2876  */
2877  appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
2878  "datdba, "
2879  "pg_encoding_to_char(encoding) AS encoding, "
2880  "datcollate, datctype, datfrozenxid, "
2881  "datacl, acldefault('d', datdba) AS acldefault, "
2882  "datistemplate, datconnlimit, ");
2883  if (fout->remoteVersion >= 90300)
2884  appendPQExpBufferStr(dbQry, "datminmxid, ");
2885  else
2886  appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
2887  if (fout->remoteVersion >= 150000)
2888  appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale, datcollversion, ");
2889  else
2890  appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS daticulocale, NULL AS datcollversion, ");
2891  appendPQExpBufferStr(dbQry,
2892  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
2893  "shobj_description(oid, 'pg_database') AS description "
2894  "FROM pg_database "
2895  "WHERE datname = current_database()");
2896 
2897  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
2898 
2899  i_tableoid = PQfnumber(res, "tableoid");
2900  i_oid = PQfnumber(res, "oid");
2901  i_datname = PQfnumber(res, "datname");
2902  i_datdba = PQfnumber(res, "datdba");
2903  i_encoding = PQfnumber(res, "encoding");
2904  i_datlocprovider = PQfnumber(res, "datlocprovider");
2905  i_collate = PQfnumber(res, "datcollate");
2906  i_ctype = PQfnumber(res, "datctype");
2907  i_daticulocale = PQfnumber(res, "daticulocale");
2908  i_frozenxid = PQfnumber(res, "datfrozenxid");
2909  i_minmxid = PQfnumber(res, "datminmxid");
2910  i_datacl = PQfnumber(res, "datacl");
2911  i_acldefault = PQfnumber(res, "acldefault");
2912  i_datistemplate = PQfnumber(res, "datistemplate");
2913  i_datconnlimit = PQfnumber(res, "datconnlimit");
2914  i_datcollversion = PQfnumber(res, "datcollversion");
2915  i_tablespace = PQfnumber(res, "tablespace");
2916 
2917  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
2918  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
2919  datname = PQgetvalue(res, 0, i_datname);
2920  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
2921  encoding = PQgetvalue(res, 0, i_encoding);
2922  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
2923  collate = PQgetvalue(res, 0, i_collate);
2924  ctype = PQgetvalue(res, 0, i_ctype);
2925  if (!PQgetisnull(res, 0, i_daticulocale))
2926  iculocale = PQgetvalue(res, 0, i_daticulocale);
2927  else
2928  iculocale = NULL;
2929  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
2930  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
2931  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
2932  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
2933  datistemplate = PQgetvalue(res, 0, i_datistemplate);
2934  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
2935  tablespace = PQgetvalue(res, 0, i_tablespace);
2936 
2937  qdatname = pg_strdup(fmtId(datname));
2938 
2939  /*
2940  * Prepare the CREATE DATABASE command. We must specify OID (if we want
2941  * to preserve that), as well as the encoding, locale, and tablespace
2942  * since those can't be altered later. Other DB properties are left to
2943  * the DATABASE PROPERTIES entry, so that they can be applied after
2944  * reconnecting to the target DB.
2945  */
2946  if (dopt->binary_upgrade)
2947  {
2948  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0 OID = %u",
2949  qdatname, dbCatId.oid);
2950  }
2951  else
2952  {
2953  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
2954  qdatname);
2955  }
2956  if (strlen(encoding) > 0)
2957  {
2958  appendPQExpBufferStr(creaQry, " ENCODING = ");
2959  appendStringLiteralAH(creaQry, encoding, fout);
2960  }
2961 
2962  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
2963  if (datlocprovider[0] == 'c')
2964  appendPQExpBufferStr(creaQry, "libc");
2965  else if (datlocprovider[0] == 'i')
2966  appendPQExpBufferStr(creaQry, "icu");
2967  else
2968  pg_fatal("unrecognized locale provider: %s",
2969  datlocprovider);
2970 
2971  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
2972  {
2973  appendPQExpBufferStr(creaQry, " LOCALE = ");
2974  appendStringLiteralAH(creaQry, collate, fout);
2975  }
2976  else
2977  {
2978  if (strlen(collate) > 0)
2979  {
2980  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
2981  appendStringLiteralAH(creaQry, collate, fout);
2982  }
2983  if (strlen(ctype) > 0)
2984  {
2985  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
2986  appendStringLiteralAH(creaQry, ctype, fout);
2987  }
2988  }
2989  if (iculocale)
2990  {
2991  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
2992  appendStringLiteralAH(creaQry, iculocale, fout);
2993  }
2994 
2995  /*
2996  * For binary upgrade, carry over the collation version. For normal
2997  * dump/restore, omit the version, so that it is computed upon restore.
2998  */
2999  if (dopt->binary_upgrade)
3000  {
3001  if (!PQgetisnull(res, 0, i_datcollversion))
3002  {
3003  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3004  appendStringLiteralAH(creaQry,
3005  PQgetvalue(res, 0, i_datcollversion),
3006  fout);
3007  }
3008  }
3009 
3010  /*
3011  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3012  * thing; the decision whether to specify a tablespace should be left till
3013  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3014  * label the DATABASE entry with the tablespace and let the normal
3015  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3016  * attention to default_tablespace, so that won't work.
3017  */
3018  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3019  !dopt->outputNoTablespaces)
3020  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3021  fmtId(tablespace));
3022  appendPQExpBufferStr(creaQry, ";\n");
3023 
3024  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3025  qdatname);
3026 
3027  dbDumpId = createDumpId();
3028 
3029  ArchiveEntry(fout,
3030  dbCatId, /* catalog ID */
3031  dbDumpId, /* dump ID */
3032  ARCHIVE_OPTS(.tag = datname,
3033  .owner = dba,
3034  .description = "DATABASE",
3035  .section = SECTION_PRE_DATA,
3036  .createStmt = creaQry->data,
3037  .dropStmt = delQry->data));
3038 
3039  /* Compute correct tag for archive entry */
3040  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3041 
3042  /* Dump DB comment if any */
3043  {
3044  /*
3045  * 8.2 and up keep comments on shared objects in a shared table, so we
3046  * cannot use the dumpComment() code used for other database objects.
3047  * Be careful that the ArchiveEntry parameters match that function.
3048  */
3049  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3050 
3051  if (comment && *comment && !dopt->no_comments)
3052  {
3053  resetPQExpBuffer(dbQry);
3054 
3055  /*
3056  * Generates warning when loaded into a differently-named
3057  * database.
3058  */
3059  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3060  appendStringLiteralAH(dbQry, comment, fout);
3061  appendPQExpBufferStr(dbQry, ";\n");
3062 
3064  ARCHIVE_OPTS(.tag = labelq->data,
3065  .owner = dba,
3066  .description = "COMMENT",
3067  .section = SECTION_NONE,
3068  .createStmt = dbQry->data,
3069  .deps = &dbDumpId,
3070  .nDeps = 1));
3071  }
3072  }
3073 
3074  /* Dump DB security label, if enabled */
3075  if (!dopt->no_security_labels)
3076  {
3077  PGresult *shres;
3078  PQExpBuffer seclabelQry;
3079 
3080  seclabelQry = createPQExpBuffer();
3081 
3082  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3083  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3084  resetPQExpBuffer(seclabelQry);
3085  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3086  if (seclabelQry->len > 0)
3088  ARCHIVE_OPTS(.tag = labelq->data,
3089  .owner = dba,
3090  .description = "SECURITY LABEL",
3091  .section = SECTION_NONE,
3092  .createStmt = seclabelQry->data,
3093  .deps = &dbDumpId,
3094  .nDeps = 1));
3095  destroyPQExpBuffer(seclabelQry);
3096  PQclear(shres);
3097  }
3098 
3099  /*
3100  * Dump ACL if any. Note that we do not support initial privileges
3101  * (pg_init_privs) on databases.
3102  */
3103  dbdacl.privtype = 0;
3104  dbdacl.initprivs = NULL;
3105 
3106  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3107  qdatname, NULL, NULL,
3108  dba, &dbdacl);
3109 
3110  /*
3111  * Now construct a DATABASE PROPERTIES archive entry to restore any
3112  * non-default database-level properties. (The reason this must be
3113  * separate is that we cannot put any additional commands into the TOC
3114  * entry that has CREATE DATABASE. pg_restore would execute such a group
3115  * in an implicit transaction block, and the backend won't allow CREATE
3116  * DATABASE in that context.)
3117  */
3118  resetPQExpBuffer(creaQry);
3119  resetPQExpBuffer(delQry);
3120 
3121  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3122  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3123  qdatname, datconnlimit);
3124 
3125  if (strcmp(datistemplate, "t") == 0)
3126  {
3127  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3128  qdatname);
3129 
3130  /*
3131  * The backend won't accept DROP DATABASE on a template database. We
3132  * can deal with that by removing the template marking before the DROP
3133  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3134  * since no such command is currently supported, fake it with a direct
3135  * UPDATE on pg_database.
3136  */
3137  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3138  "SET datistemplate = false WHERE datname = ");
3139  appendStringLiteralAH(delQry, datname, fout);
3140  appendPQExpBufferStr(delQry, ";\n");
3141  }
3142 
3143  /* Add database-specific SET options */
3144  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3145 
3146  /*
3147  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3148  * entry, too, for lack of a better place.
3149  */
3150  if (dopt->binary_upgrade)
3151  {
3152  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3153  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3154  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3155  "WHERE datname = ",
3156  frozenxid, minmxid);
3157  appendStringLiteralAH(creaQry, datname, fout);
3158  appendPQExpBufferStr(creaQry, ";\n");
3159  }
3160 
3161  if (creaQry->len > 0)
3163  ARCHIVE_OPTS(.tag = datname,
3164  .owner = dba,
3165  .description = "DATABASE PROPERTIES",
3166  .section = SECTION_PRE_DATA,
3167  .createStmt = creaQry->data,
3168  .dropStmt = delQry->data,
3169  .deps = &dbDumpId));
3170 
3171  /*
3172  * pg_largeobject comes from the old system intact, so set its
3173  * relfrozenxids, relminmxids and relfilenode.
3174  */
3175  if (dopt->binary_upgrade)
3176  {
3177  PGresult *lo_res;
3178  PQExpBuffer loFrozenQry = createPQExpBuffer();
3179  PQExpBuffer loOutQry = createPQExpBuffer();
3180  PQExpBuffer loHorizonQry = createPQExpBuffer();
3181  int ii_relfrozenxid,
3182  ii_relfilenode,
3183  ii_oid,
3184  ii_relminmxid;
3185 
3186  /*
3187  * pg_largeobject
3188  */
3189  if (fout->remoteVersion >= 90300)
3190  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3191  "FROM pg_catalog.pg_class\n"
3192  "WHERE oid IN (%u, %u);\n",
3193  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3194  else
3195  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3196  "FROM pg_catalog.pg_class\n"
3197  "WHERE oid IN (%u, %u);\n",
3198  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3199 
3200  lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3201 
3202  ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3203  ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3204  ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3205  ii_oid = PQfnumber(lo_res, "oid");
3206 
3207  appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3208  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3209  for (int i = 0; i < PQntuples(lo_res); ++i)
3210  {
3211  Oid oid;
3212  RelFileNumber relfilenumber;
3213 
3214  appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3215  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3216  "WHERE oid = %u;\n",
3217  atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3218  atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3219  atooid(PQgetvalue(lo_res, i, ii_oid)));
3220 
3221  oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3222  relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3223 
3224  if (oid == LargeObjectRelationId)
3225  appendPQExpBuffer(loOutQry,
3226  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3227  relfilenumber);
3228  else if (oid == LargeObjectLOidPNIndexId)
3229  appendPQExpBuffer(loOutQry,
3230  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3231  relfilenumber);
3232  }
3233 
3234  appendPQExpBufferStr(loOutQry,
3235  "TRUNCATE pg_catalog.pg_largeobject;\n");
3236  appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3237 
3239  ARCHIVE_OPTS(.tag = "pg_largeobject",
3240  .description = "pg_largeobject",
3241  .section = SECTION_PRE_DATA,
3242  .createStmt = loOutQry->data));
3243 
3244  PQclear(lo_res);
3245 
3246  destroyPQExpBuffer(loFrozenQry);
3247  destroyPQExpBuffer(loHorizonQry);
3248  destroyPQExpBuffer(loOutQry);
3249  }
3250 
3251  PQclear(res);
3252 
3253  free(qdatname);
3254  destroyPQExpBuffer(dbQry);
3255  destroyPQExpBuffer(delQry);
3256  destroyPQExpBuffer(creaQry);
3257  destroyPQExpBuffer(labelq);
3258 }
3259 
3260 /*
3261  * Collect any database-specific or role-and-database-specific SET options
3262  * for this database, and append them to outbuf.
3263  */
3264 static void
3266  const char *dbname, Oid dboid)
3267 {
3268  PGconn *conn = GetConnection(AH);
3270  PGresult *res;
3271 
3272  /* First collect database-specific options */
3273  printfPQExpBuffer(buf, "SELECT unnest(setconfig)");
3274  if (AH->remoteVersion >= 160000)
3275  appendPQExpBufferStr(buf, ", unnest(setuser)");
3276  appendPQExpBuffer(buf, " FROM pg_db_role_setting "
3277  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3278  dboid);
3279 
3280  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3281 
3282  for (int i = 0; i < PQntuples(res); i++)
3283  {
3284  char *userset = NULL;
3285 
3286  if (AH->remoteVersion >= 160000)
3287  userset = PQgetvalue(res, i, 1);
3288  makeAlterConfigCommand(conn, PQgetvalue(res, i, 0), userset,
3289  "DATABASE", dbname, NULL, NULL,
3290  outbuf);
3291  }
3292 
3293  PQclear(res);
3294 
3295  /* Now look for role-and-database-specific options */
3296  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig)");
3297  if (AH->remoteVersion >= 160000)
3298  appendPQExpBufferStr(buf, ", unnest(setuser)");
3299  appendPQExpBuffer(buf, " FROM pg_db_role_setting s, pg_roles r "
3300  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3301  dboid);
3302 
3303  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3304 
3305  for (int i = 0; i < PQntuples(res); i++)
3306  {
3307  char *userset = NULL;
3308 
3309  if (AH->remoteVersion >= 160000)
3310  userset = PQgetvalue(res, i, 2);
3311  makeAlterConfigCommand(conn, PQgetvalue(res, i, 1), userset,
3312  "ROLE", PQgetvalue(res, i, 0),
3313  "DATABASE", dbname,
3314  outbuf);
3315  }
3316 
3317  PQclear(res);
3318 
3320 }
3321 
3322 /*
3323  * dumpEncoding: put the correct encoding into the archive
3324  */
3325 static void
3327 {
3328  const char *encname = pg_encoding_to_char(AH->encoding);
3330 
3331  pg_log_info("saving encoding = %s", encname);
3332 
3333  appendPQExpBufferStr(qry, "SET client_encoding = ");
3334  appendStringLiteralAH(qry, encname, AH);
3335  appendPQExpBufferStr(qry, ";\n");
3336 
3338  ARCHIVE_OPTS(.tag = "ENCODING",
3339  .description = "ENCODING",
3340  .section = SECTION_PRE_DATA,
3341  .createStmt = qry->data));
3342 
3343  destroyPQExpBuffer(qry);
3344 }
3345 
3346 
3347 /*
3348  * dumpStdStrings: put the correct escape string behavior into the archive
3349  */
3350 static void
3352 {
3353  const char *stdstrings = AH->std_strings ? "on" : "off";
3355 
3356  pg_log_info("saving standard_conforming_strings = %s",
3357  stdstrings);
3358 
3359  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3360  stdstrings);
3361 
3363  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3364  .description = "STDSTRINGS",
3365  .section = SECTION_PRE_DATA,
3366  .createStmt = qry->data));
3367 
3368  destroyPQExpBuffer(qry);
3369 }
3370 
3371 /*
3372  * dumpSearchPath: record the active search_path in the archive
3373  */
3374 static void
3376 {
3378  PQExpBuffer path = createPQExpBuffer();
3379  PGresult *res;
3380  char **schemanames = NULL;
3381  int nschemanames = 0;
3382  int i;
3383 
3384  /*
3385  * We use the result of current_schemas(), not the search_path GUC,
3386  * because that might contain wildcards such as "$user", which won't
3387  * necessarily have the same value during restore. Also, this way avoids
3388  * listing schemas that may appear in search_path but not actually exist,
3389  * which seems like a prudent exclusion.
3390  */
3392  "SELECT pg_catalog.current_schemas(false)");
3393 
3394  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3395  pg_fatal("could not parse result of current_schemas()");
3396 
3397  /*
3398  * We use set_config(), not a simple "SET search_path" command, because
3399  * the latter has less-clean behavior if the search path is empty. While
3400  * that's likely to get fixed at some point, it seems like a good idea to
3401  * be as backwards-compatible as possible in what we put into archives.
3402  */
3403  for (i = 0; i < nschemanames; i++)
3404  {
3405  if (i > 0)
3406  appendPQExpBufferStr(path, ", ");
3407  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3408  }
3409 
3410  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3411  appendStringLiteralAH(qry, path->data, AH);
3412  appendPQExpBufferStr(qry, ", false);\n");
3413 
3414  pg_log_info("saving search_path = %s", path->data);
3415 
3417  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3418  .description = "SEARCHPATH",
3419  .section = SECTION_PRE_DATA,
3420  .createStmt = qry->data));
3421 
3422  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3423  AH->searchpath = pg_strdup(qry->data);
3424 
3425  free(schemanames);
3426  PQclear(res);
3427  destroyPQExpBuffer(qry);
3428  destroyPQExpBuffer(path);
3429 }
3430 
3431 
3432 /*
3433  * getLOs:
3434  * Collect schema-level data about large objects
3435  */
3436 static void
3438 {
3439  DumpOptions *dopt = fout->dopt;
3440  PQExpBuffer loQry = createPQExpBuffer();
3441  LoInfo *loinfo;
3442  DumpableObject *lodata;
3443  PGresult *res;
3444  int ntups;
3445  int i;
3446  int i_oid;
3447  int i_lomowner;
3448  int i_lomacl;
3449  int i_acldefault;
3450 
3451  pg_log_info("reading large objects");
3452 
3453  /* Fetch LO OIDs, and owner/ACL data */
3454  appendPQExpBufferStr(loQry,
3455  "SELECT oid, lomowner, lomacl, "
3456  "acldefault('L', lomowner) AS acldefault "
3457  "FROM pg_largeobject_metadata");
3458 
3459  res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3460 
3461  i_oid = PQfnumber(res, "oid");
3462  i_lomowner = PQfnumber(res, "lomowner");
3463  i_lomacl = PQfnumber(res, "lomacl");
3464  i_acldefault = PQfnumber(res, "acldefault");
3465 
3466  ntups = PQntuples(res);
3467 
3468  /*
3469  * Each large object has its own "BLOB" archive entry.
3470  */
3471  loinfo = (LoInfo *) pg_malloc(ntups * sizeof(LoInfo));
3472 
3473  for (i = 0; i < ntups; i++)
3474  {
3475  loinfo[i].dobj.objType = DO_LARGE_OBJECT;
3476  loinfo[i].dobj.catId.tableoid = LargeObjectRelationId;
3477  loinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
3478  AssignDumpId(&loinfo[i].dobj);
3479 
3480  loinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_oid));
3481  loinfo[i].dacl.acl = pg_strdup(PQgetvalue(res, i, i_lomacl));
3482  loinfo[i].dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3483  loinfo[i].dacl.privtype = 0;
3484  loinfo[i].dacl.initprivs = NULL;
3485  loinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_lomowner));
3486 
3487  /* LOs have data */
3488  loinfo[i].dobj.components |= DUMP_COMPONENT_DATA;
3489 
3490  /* Mark whether LO has an ACL */
3491  if (!PQgetisnull(res, i, i_lomacl))
3492  loinfo[i].dobj.components |= DUMP_COMPONENT_ACL;
3493 
3494  /*
3495  * In binary-upgrade mode for LOs, we do *not* dump out the LO
3496  * data, as it will be copied by pg_upgrade, which simply copies the
3497  * pg_largeobject table. We *do* however dump out anything but the
3498  * data, as pg_upgrade copies just pg_largeobject, but not
3499  * pg_largeobject_metadata, after the dump is restored.
3500  */
3501  if (dopt->binary_upgrade)
3502  loinfo[i].dobj.dump &= ~DUMP_COMPONENT_DATA;
3503  }
3504 
3505  /*
3506  * If we have any large objects, a "BLOBS" archive entry is needed. This
3507  * is just a placeholder for sorting; it carries no data now.
3508  */
3509  if (ntups > 0)
3510  {
3511  lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3512  lodata->objType = DO_LARGE_OBJECT_DATA;
3513  lodata->catId = nilCatalogId;
3514  AssignDumpId(lodata);
3515  lodata->name = pg_strdup("BLOBS");
3516  lodata->components |= DUMP_COMPONENT_DATA;
3517  }
3518 
3519  PQclear(res);
3520  destroyPQExpBuffer(loQry);
3521 }
3522 
3523 /*
3524  * dumpLO
3525  *
3526  * dump the definition (metadata) of the given large object
3527  */
3528 static void
3529 dumpLO(Archive *fout, const LoInfo *loinfo)
3530 {
3531  PQExpBuffer cquery = createPQExpBuffer();
3532  PQExpBuffer dquery = createPQExpBuffer();
3533 
3534  appendPQExpBuffer(cquery,
3535  "SELECT pg_catalog.lo_create('%s');\n",
3536  loinfo->dobj.name);
3537 
3538  appendPQExpBuffer(dquery,
3539  "SELECT pg_catalog.lo_unlink('%s');\n",
3540  loinfo->dobj.name);
3541 
3542  if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3543  ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3544  ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3545  .owner = loinfo->rolname,
3546  .description = "BLOB",
3547  .section = SECTION_PRE_DATA,
3548  .createStmt = cquery->data,
3549  .dropStmt = dquery->data));
3550 
3551  /* Dump comment if any */
3552  if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3553  dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
3554  NULL, loinfo->rolname,
3555  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3556 
3557  /* Dump security label if any */
3558  if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3559  dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
3560  NULL, loinfo->rolname,
3561  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
3562 
3563  /* Dump ACL if any */
3564  if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3565  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
3566  loinfo->dobj.name, NULL,
3567  NULL, loinfo->rolname, &loinfo->dacl);
3568 
3569  destroyPQExpBuffer(cquery);
3570  destroyPQExpBuffer(dquery);
3571 }
3572 
3573 /*
3574  * dumpLOs:
3575  * dump the data contents of all large objects
3576  */
3577 static int
3578 dumpLOs(Archive *fout, const void *arg)
3579 {
3580  const char *loQry;
3581  const char *loFetchQry;
3582  PGconn *conn = GetConnection(fout);
3583  PGresult *res;
3584  char buf[LOBBUFSIZE];
3585  int ntups;
3586  int i;
3587  int cnt;
3588 
3589  pg_log_info("saving large objects");
3590 
3591  /*
3592  * Currently, we re-fetch all LO OIDs using a cursor. Consider scanning
3593  * the already-in-memory dumpable objects instead...
3594  */
3595  loQry =
3596  "DECLARE looid CURSOR FOR "
3597  "SELECT oid FROM pg_largeobject_metadata ORDER BY 1";
3598 
3599  ExecuteSqlStatement(fout, loQry);
3600 
3601  /* Command to fetch from cursor */
3602  loFetchQry = "FETCH 1000 IN looid";
3603 
3604  do
3605  {
3606  /* Do a fetch */
3607  res = ExecuteSqlQuery(fout, loFetchQry, PGRES_TUPLES_OK);
3608 
3609  /* Process the tuples, if any */
3610  ntups = PQntuples(res);
3611  for (i = 0; i < ntups; i++)
3612  {
3613  Oid loOid;
3614  int loFd;
3615 
3616  loOid = atooid(PQgetvalue(res, i, 0));
3617  /* Open the LO */
3618  loFd = lo_open(conn, loOid, INV_READ);
3619  if (loFd == -1)
3620  pg_fatal("could not open large object %u: %s",
3621  loOid, PQerrorMessage(conn));
3622 
3623  StartLO(fout, loOid);
3624 
3625  /* Now read it in chunks, sending data to archive */
3626  do
3627  {
3628  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3629  if (cnt < 0)
3630  pg_fatal("error reading large object %u: %s",
3631  loOid, PQerrorMessage(conn));
3632 
3633  WriteData(fout, buf, cnt);
3634  } while (cnt > 0);
3635 
3636  lo_close(conn, loFd);
3637 
3638  EndLO(fout, loOid);
3639  }
3640 
3641  PQclear(res);
3642  } while (ntups > 0);
3643 
3644  return 1;
3645 }
3646 
3647 /*
3648  * getPolicies
3649  * get information about all RLS policies on dumpable tables.
3650  */
3651 void
3652 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3653 {
3654  PQExpBuffer query;
3655  PQExpBuffer tbloids;
3656  PGresult *res;
3657  PolicyInfo *polinfo;
3658  int i_oid;
3659  int i_tableoid;
3660  int i_polrelid;
3661  int i_polname;
3662  int i_polcmd;
3663  int i_polpermissive;
3664  int i_polroles;
3665  int i_polqual;
3666  int i_polwithcheck;
3667  int i,
3668  j,
3669  ntups;
3670 
3671  /* No policies before 9.5 */
3672  if (fout->remoteVersion < 90500)
3673  return;
3674 
3675  query = createPQExpBuffer();
3676  tbloids = createPQExpBuffer();
3677 
3678  /*
3679  * Identify tables of interest, and check which ones have RLS enabled.
3680  */
3681  appendPQExpBufferChar(tbloids, '{');
3682  for (i = 0; i < numTables; i++)
3683  {
3684  TableInfo *tbinfo = &tblinfo[i];
3685 
3686  /* Ignore row security on tables not to be dumped */
3687  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
3688  continue;
3689 
3690  /* It can't have RLS or policies if it's not a table */
3691  if (tbinfo->relkind != RELKIND_RELATION &&
3692  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
3693  continue;
3694 
3695  /* Add it to the list of table OIDs to be probed below */
3696  if (tbloids->len > 1) /* do we have more than the '{'? */
3697  appendPQExpBufferChar(tbloids, ',');
3698  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
3699 
3700  /* Is RLS enabled? (That's separate from whether it has policies) */
3701  if (tbinfo->rowsec)
3702  {
3704 
3705  /*
3706  * We represent RLS being enabled on a table by creating a
3707  * PolicyInfo object with null polname.
3708  *
3709  * Note: use tableoid 0 so that this object won't be mistaken for
3710  * something that pg_depend entries apply to.
3711  */
3712  polinfo = pg_malloc(sizeof(PolicyInfo));
3713  polinfo->dobj.objType = DO_POLICY;
3714  polinfo->dobj.catId.tableoid = 0;
3715  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
3716  AssignDumpId(&polinfo->dobj);
3717  polinfo->dobj.namespace = tbinfo->dobj.namespace;
3718  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
3719  polinfo->poltable = tbinfo;
3720  polinfo->polname = NULL;
3721  polinfo->polcmd = '\0';
3722  polinfo->polpermissive = 0;
3723  polinfo->polroles = NULL;
3724  polinfo->polqual = NULL;
3725  polinfo->polwithcheck = NULL;
3726  }
3727  }
3728  appendPQExpBufferChar(tbloids, '}');
3729 
3730  /*
3731  * Now, read all RLS policies belonging to the tables of interest, and
3732  * create PolicyInfo objects for them. (Note that we must filter the
3733  * results server-side not locally, because we dare not apply pg_get_expr
3734  * to tables we don't have lock on.)
3735  */
3736  pg_log_info("reading row-level security policies");
3737 
3738  printfPQExpBuffer(query,
3739  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
3740  if (fout->remoteVersion >= 100000)
3741  appendPQExpBufferStr(query, "pol.polpermissive, ");
3742  else
3743  appendPQExpBufferStr(query, "'t' as polpermissive, ");
3744  appendPQExpBuffer(query,
3745  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
3746  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
3747  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
3748  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
3749  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
3750  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
3751  tbloids->data);
3752 
3753  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3754 
3755  ntups = PQntuples(res);
3756  if (ntups > 0)
3757  {
3758  i_oid = PQfnumber(res, "oid");
3759  i_tableoid = PQfnumber(res, "tableoid");
3760  i_polrelid = PQfnumber(res, "polrelid");
3761  i_polname = PQfnumber(res, "polname");
3762  i_polcmd = PQfnumber(res, "polcmd");
3763  i_polpermissive = PQfnumber(res, "polpermissive");
3764  i_polroles = PQfnumber(res, "polroles");
3765  i_polqual = PQfnumber(res, "polqual");
3766  i_polwithcheck = PQfnumber(res, "polwithcheck");
3767 
3768  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
3769 
3770  for (j = 0; j < ntups; j++)
3771  {
3772  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
3773  TableInfo *tbinfo = findTableByOid(polrelid);
3774 
3776 
3777  polinfo[j].dobj.objType = DO_POLICY;
3778  polinfo[j].dobj.catId.tableoid =
3779  atooid(PQgetvalue(res, j, i_tableoid));
3780  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
3781  AssignDumpId(&polinfo[j].dobj);
3782  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
3783  polinfo[j].poltable = tbinfo;
3784  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
3785  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
3786 
3787  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
3788  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
3789 
3790  if (PQgetisnull(res, j, i_polroles))
3791  polinfo[j].polroles = NULL;
3792  else
3793  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
3794 
3795  if (PQgetisnull(res, j, i_polqual))
3796  polinfo[j].polqual = NULL;
3797  else
3798  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
3799 
3800  if (PQgetisnull(res, j, i_polwithcheck))
3801  polinfo[j].polwithcheck = NULL;
3802  else
3803  polinfo[j].polwithcheck
3804  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
3805  }
3806  }
3807 
3808  PQclear(res);
3809 
3810  destroyPQExpBuffer(query);
3811  destroyPQExpBuffer(tbloids);
3812 }
3813 
3814 /*
3815  * dumpPolicy
3816  * dump the definition of the given policy
3817  */
3818 static void
3819 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
3820 {
3821  DumpOptions *dopt = fout->dopt;
3822  TableInfo *tbinfo = polinfo->poltable;
3823  PQExpBuffer query;
3824  PQExpBuffer delqry;
3825  PQExpBuffer polprefix;
3826  char *qtabname;
3827  const char *cmd;
3828  char *tag;
3829 
3830  /* Do nothing in data-only dump */
3831  if (dopt->dataOnly)
3832  return;
3833 
3834  /*
3835  * If polname is NULL, then this record is just indicating that ROW LEVEL
3836  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
3837  * ROW LEVEL SECURITY.
3838  */
3839  if (polinfo->polname == NULL)
3840  {
3841  query = createPQExpBuffer();
3842 
3843  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
3844  fmtQualifiedDumpable(tbinfo));
3845 
3846  /*
3847  * We must emit the ROW SECURITY object's dependency on its table
3848  * explicitly, because it will not match anything in pg_depend (unlike
3849  * the case for other PolicyInfo objects).
3850  */
3851  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3852  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3853  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
3854  .namespace = polinfo->dobj.namespace->dobj.name,
3855  .owner = tbinfo->rolname,
3856  .description = "ROW SECURITY",
3857  .section = SECTION_POST_DATA,
3858  .createStmt = query->data,
3859  .deps = &(tbinfo->dobj.dumpId),
3860  .nDeps = 1));
3861 
3862  destroyPQExpBuffer(query);
3863  return;
3864  }
3865 
3866  if (polinfo->polcmd == '*')
3867  cmd = "";
3868  else if (polinfo->polcmd == 'r')
3869  cmd = " FOR SELECT";
3870  else if (polinfo->polcmd == 'a')
3871  cmd = " FOR INSERT";
3872  else if (polinfo->polcmd == 'w')
3873  cmd = " FOR UPDATE";
3874  else if (polinfo->polcmd == 'd')
3875  cmd = " FOR DELETE";
3876  else
3877  pg_fatal("unexpected policy command type: %c",
3878  polinfo->polcmd);
3879 
3880  query = createPQExpBuffer();
3881  delqry = createPQExpBuffer();
3882  polprefix = createPQExpBuffer();
3883 
3884  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
3885 
3886  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
3887 
3888  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
3889  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
3890 
3891  if (polinfo->polroles != NULL)
3892  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
3893 
3894  if (polinfo->polqual != NULL)
3895  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
3896 
3897  if (polinfo->polwithcheck != NULL)
3898  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
3899 
3900  appendPQExpBufferStr(query, ";\n");
3901 
3902  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
3903  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
3904 
3905  appendPQExpBuffer(polprefix, "POLICY %s ON",
3906  fmtId(polinfo->polname));
3907 
3908  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
3909 
3910  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3911  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
3912  ARCHIVE_OPTS(.tag = tag,
3913  .namespace = polinfo->dobj.namespace->dobj.name,
3914  .owner = tbinfo->rolname,
3915  .description = "POLICY",
3916  .section = SECTION_POST_DATA,
3917  .createStmt = query->data,
3918  .dropStmt = delqry->data));
3919 
3920  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3921  dumpComment(fout, polprefix->data, qtabname,
3922  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
3923  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
3924 
3925  free(tag);
3926  destroyPQExpBuffer(query);
3927  destroyPQExpBuffer(delqry);
3928  destroyPQExpBuffer(polprefix);
3929  free(qtabname);
3930 }
3931 
3932 /*
3933  * getPublications
3934  * get information about publications
3935  */
3937 getPublications(Archive *fout, int *numPublications)
3938 {
3939  DumpOptions *dopt = fout->dopt;
3940  PQExpBuffer query;
3941  PGresult *res;
3942  PublicationInfo *pubinfo;
3943  int i_tableoid;
3944  int i_oid;
3945  int i_pubname;
3946  int i_pubowner;
3947  int i_puballtables;
3948  int i_pubinsert;
3949  int i_pubupdate;
3950  int i_pubdelete;
3951  int i_pubtruncate;
3952  int i_pubviaroot;
3953  int i,
3954  ntups;
3955 
3956  if (dopt->no_publications || fout->remoteVersion < 100000)
3957  {
3958  *numPublications = 0;
3959  return NULL;
3960  }
3961 
3962  query = createPQExpBuffer();
3963 
3964  resetPQExpBuffer(query);
3965 
3966  /* Get the publications. */
3967  if (fout->remoteVersion >= 130000)
3968  appendPQExpBufferStr(query,
3969  "SELECT p.tableoid, p.oid, p.pubname, "
3970  "p.pubowner, "
3971  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
3972  "FROM pg_publication p");
3973  else if (fout->remoteVersion >= 110000)
3974  appendPQExpBufferStr(query,
3975  "SELECT p.tableoid, p.oid, p.pubname, "
3976  "p.pubowner, "
3977  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
3978  "FROM pg_publication p");
3979  else
3980  appendPQExpBufferStr(query,
3981  "SELECT p.tableoid, p.oid, p.pubname, "
3982  "p.pubowner, "
3983  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
3984  "FROM pg_publication p");
3985 
3986  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
3987 
3988  ntups = PQntuples(res);
3989 
3990  i_tableoid = PQfnumber(res, "tableoid");
3991  i_oid = PQfnumber(res, "oid");
3992  i_pubname = PQfnumber(res, "pubname");
3993  i_pubowner = PQfnumber(res, "pubowner");
3994  i_puballtables = PQfnumber(res, "puballtables");
3995  i_pubinsert = PQfnumber(res, "pubinsert");
3996  i_pubupdate = PQfnumber(res, "pubupdate");
3997  i_pubdelete = PQfnumber(res, "pubdelete");
3998  i_pubtruncate = PQfnumber(res, "pubtruncate");
3999  i_pubviaroot = PQfnumber(res, "pubviaroot");
4000 
4001  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4002 
4003  for (i = 0; i < ntups; i++)
4004  {
4005  pubinfo[i].dobj.objType = DO_PUBLICATION;
4006  pubinfo[i].dobj.catId.tableoid =
4007  atooid(PQgetvalue(res, i, i_tableoid));
4008  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4009  AssignDumpId(&pubinfo[i].dobj);
4010  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4011  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4012  pubinfo[i].puballtables =
4013  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4014  pubinfo[i].pubinsert =
4015  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4016  pubinfo[i].pubupdate =
4017  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4018  pubinfo[i].pubdelete =
4019  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4020  pubinfo[i].pubtruncate =
4021  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4022  pubinfo[i].pubviaroot =
4023  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4024 
4025  /* Decide whether we want to dump it */
4026  selectDumpableObject(&(pubinfo[i].dobj), fout);
4027  }
4028  PQclear(res);
4029 
4030  destroyPQExpBuffer(query);
4031 
4032  *numPublications = ntups;
4033  return pubinfo;
4034 }
4035 
4036 /*
4037  * dumpPublication
4038  * dump the definition of the given publication
4039  */
4040 static void
4042 {
4043  DumpOptions *dopt = fout->dopt;
4044  PQExpBuffer delq;
4045  PQExpBuffer query;
4046  char *qpubname;
4047  bool first = true;
4048 
4049  /* Do nothing in data-only dump */
4050  if (dopt->dataOnly)
4051  return;
4052 
4053  delq = createPQExpBuffer();
4054  query = createPQExpBuffer();
4055 
4056  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4057 
4058  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4059  qpubname);
4060 
4061  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4062  qpubname);
4063 
4064  if (pubinfo->puballtables)
4065  appendPQExpBufferStr(query, " FOR ALL TABLES");
4066 
4067  appendPQExpBufferStr(query, " WITH (publish = '");
4068  if (pubinfo->pubinsert)
4069  {
4070  appendPQExpBufferStr(query, "insert");
4071  first = false;
4072  }
4073 
4074  if (pubinfo->pubupdate)
4075  {
4076  if (!first)
4077  appendPQExpBufferStr(query, ", ");
4078 
4079  appendPQExpBufferStr(query, "update");
4080  first = false;
4081  }
4082 
4083  if (pubinfo->pubdelete)
4084  {
4085  if (!first)
4086  appendPQExpBufferStr(query, ", ");
4087 
4088  appendPQExpBufferStr(query, "delete");
4089  first = false;
4090  }
4091 
4092  if (pubinfo->pubtruncate)
4093  {
4094  if (!first)
4095  appendPQExpBufferStr(query, ", ");
4096 
4097  appendPQExpBufferStr(query, "truncate");
4098  first = false;
4099  }
4100 
4101  appendPQExpBufferChar(query, '\'');
4102 
4103  if (pubinfo->pubviaroot)
4104  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4105 
4106  appendPQExpBufferStr(query, ");\n");
4107 
4108  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4109  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4110  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4111  .owner = pubinfo->rolname,
4112  .description = "PUBLICATION",
4113  .section = SECTION_POST_DATA,
4114  .createStmt = query->data,
4115  .dropStmt = delq->data));
4116 
4117  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4118  dumpComment(fout, "PUBLICATION", qpubname,
4119  NULL, pubinfo->rolname,
4120  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4121 
4122  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4123  dumpSecLabel(fout, "PUBLICATION", qpubname,
4124  NULL, pubinfo->rolname,
4125  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4126 
4127  destroyPQExpBuffer(delq);
4128  destroyPQExpBuffer(query);
4129  free(qpubname);
4130 }
4131 
4132 /*
4133  * getPublicationNamespaces
4134  * get information about publication membership for dumpable schemas.
4135  */
4136 void
4138 {
4139  PQExpBuffer query;
4140  PGresult *res;
4141  PublicationSchemaInfo *pubsinfo;
4142  DumpOptions *dopt = fout->dopt;
4143  int i_tableoid;
4144  int i_oid;
4145  int i_pnpubid;
4146  int i_pnnspid;
4147  int i,
4148  j,
4149  ntups;
4150 
4151  if (dopt->no_publications || fout->remoteVersion < 150000)
4152  return;
4153 
4154  query = createPQExpBuffer();
4155 
4156  /* Collect all publication membership info. */
4157  appendPQExpBufferStr(query,
4158  "SELECT tableoid, oid, pnpubid, pnnspid "
4159  "FROM pg_catalog.pg_publication_namespace");
4160  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4161 
4162  ntups = PQntuples(res);
4163 
4164  i_tableoid = PQfnumber(res, "tableoid");
4165  i_oid = PQfnumber(res, "oid");
4166  i_pnpubid = PQfnumber(res, "pnpubid");
4167  i_pnnspid = PQfnumber(res, "pnnspid");
4168 
4169  /* this allocation may be more than we need */
4170  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4171  j = 0;
4172 
4173  for (i = 0; i < ntups; i++)
4174  {
4175  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4176  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4177  PublicationInfo *pubinfo;
4178  NamespaceInfo *nspinfo;
4179 
4180  /*
4181  * Ignore any entries for which we aren't interested in either the
4182  * publication or the rel.
4183  */
4184  pubinfo = findPublicationByOid(pnpubid);
4185  if (pubinfo == NULL)
4186  continue;
4187  nspinfo = findNamespaceByOid(pnnspid);
4188  if (nspinfo == NULL)
4189  continue;
4190 
4191  /*
4192  * We always dump publication namespaces unless the corresponding
4193  * namespace is excluded from the dump.
4194  */
4195  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4196  continue;
4197 
4198  /* OK, make a DumpableObject for this relationship */
4200  pubsinfo[j].dobj.catId.tableoid =
4201  atooid(PQgetvalue(res, i, i_tableoid));
4202  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4203  AssignDumpId(&pubsinfo[j].dobj);
4204  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4205  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4206  pubsinfo[j].publication = pubinfo;
4207  pubsinfo[j].pubschema = nspinfo;
4208 
4209  /* Decide whether we want to dump it */
4210  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4211 
4212  j++;
4213  }
4214 
4215  PQclear(res);
4216  destroyPQExpBuffer(query);
4217 }
4218 
4219 /*
4220  * getPublicationTables
4221  * get information about publication membership for dumpable tables.
4222  */
4223 void
4224 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4225 {
4226  PQExpBuffer query;
4227  PGresult *res;
4228  PublicationRelInfo *pubrinfo;
4229  DumpOptions *dopt = fout->dopt;
4230  int i_tableoid;
4231  int i_oid;
4232  int i_prpubid;
4233  int i_prrelid;
4234  int i_prrelqual;
4235  int i_prattrs;
4236  int i,
4237  j,
4238  ntups;
4239 
4240  if (dopt->no_publications || fout->remoteVersion < 100000)
4241  return;
4242 
4243  query = createPQExpBuffer();
4244 
4245  /* Collect all publication membership info. */
4246  if (fout->remoteVersion >= 150000)
4247  appendPQExpBufferStr(query,
4248  "SELECT tableoid, oid, prpubid, prrelid, "
4249  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4250  "(CASE\n"
4251  " WHEN pr.prattrs IS NOT NULL THEN\n"
4252  " (SELECT array_agg(attname)\n"
4253  " FROM\n"
4254  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4255  " pg_catalog.pg_attribute\n"
4256  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4257  " ELSE NULL END) prattrs "
4258  "FROM pg_catalog.pg_publication_rel pr");
4259  else
4260  appendPQExpBufferStr(query,
4261  "SELECT tableoid, oid, prpubid, prrelid, "
4262  "NULL AS prrelqual, NULL AS prattrs "
4263  "FROM pg_catalog.pg_publication_rel");
4264  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4265 
4266  ntups = PQntuples(res);
4267 
4268  i_tableoid = PQfnumber(res, "tableoid");
4269  i_oid = PQfnumber(res, "oid");
4270  i_prpubid = PQfnumber(res, "prpubid");
4271  i_prrelid = PQfnumber(res, "prrelid");
4272  i_prrelqual = PQfnumber(res, "prrelqual");
4273  i_prattrs = PQfnumber(res, "prattrs");
4274 
4275  /* this allocation may be more than we need */
4276  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4277  j = 0;
4278 
4279  for (i = 0; i < ntups; i++)
4280  {
4281  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4282  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4283  PublicationInfo *pubinfo;
4284  TableInfo *tbinfo;
4285 
4286  /*
4287  * Ignore any entries for which we aren't interested in either the
4288  * publication or the rel.
4289  */
4290  pubinfo = findPublicationByOid(prpubid);
4291  if (pubinfo == NULL)
4292  continue;
4293  tbinfo = findTableByOid(prrelid);
4294  if (tbinfo == NULL)
4295  continue;
4296 
4297  /*
4298  * Ignore publication membership of tables whose definitions are not
4299  * to be dumped.
4300  */
4301  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4302  continue;
4303 
4304  /* OK, make a DumpableObject for this relationship */
4305  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4306  pubrinfo[j].dobj.catId.tableoid =
4307  atooid(PQgetvalue(res, i, i_tableoid));
4308  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4309  AssignDumpId(&pubrinfo[j].dobj);
4310  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4311  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4312  pubrinfo[j].publication = pubinfo;
4313  pubrinfo[j].pubtable = tbinfo;
4314  if (PQgetisnull(res, i, i_prrelqual))
4315  pubrinfo[j].pubrelqual = NULL;
4316  else
4317  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4318 
4319  if (!PQgetisnull(res, i, i_prattrs))
4320  {
4321  char **attnames;
4322  int nattnames;
4323  PQExpBuffer attribs;
4324 
4325  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4326  &attnames, &nattnames))
4327  pg_fatal("could not parse %s array", "prattrs");
4328  attribs = createPQExpBuffer();
4329  for (int k = 0; k < nattnames; k++)
4330  {
4331  if (k > 0)
4332  appendPQExpBufferStr(attribs, ", ");
4333 
4334  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4335  }
4336  pubrinfo[j].pubrattrs = attribs->data;
4337  }
4338  else
4339  pubrinfo[j].pubrattrs = NULL;
4340 
4341  /* Decide whether we want to dump it */
4342  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4343 
4344  j++;
4345  }
4346 
4347  PQclear(res);
4348  destroyPQExpBuffer(query);
4349 }
4350 
4351 /*
4352  * dumpPublicationNamespace
4353  * dump the definition of the given publication schema mapping.
4354  */
4355 static void
4357 {
4358  DumpOptions *dopt = fout->dopt;
4359  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4360  PublicationInfo *pubinfo = pubsinfo->publication;
4361  PQExpBuffer query;
4362  char *tag;
4363 
4364  /* Do nothing in data-only dump */
4365  if (dopt->dataOnly)
4366  return;
4367 
4368  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4369 
4370  query = createPQExpBuffer();
4371 
4372  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4373  appendPQExpBuffer(query, "ADD TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4374 
4375  /*
4376  * There is no point in creating drop query as the drop is done by schema
4377  * drop.
4378  */
4379  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4380  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4381  ARCHIVE_OPTS(.tag = tag,
4382  .namespace = schemainfo->dobj.name,
4383  .owner = pubinfo->rolname,
4384  .description = "PUBLICATION TABLES IN SCHEMA",
4385  .section = SECTION_POST_DATA,
4386  .createStmt = query->data));
4387 
4388  /* These objects can't currently have comments or seclabels */
4389 
4390  free(tag);
4391  destroyPQExpBuffer(query);
4392 }
4393 
4394 /*
4395  * dumpPublicationTable
4396  * dump the definition of the given publication table mapping
4397  */
4398 static void
4400 {
4401  DumpOptions *dopt = fout->dopt;
4402  PublicationInfo *pubinfo = pubrinfo->publication;
4403  TableInfo *tbinfo = pubrinfo->pubtable;
4404  PQExpBuffer query;
4405  char *tag;
4406 
4407  /* Do nothing in data-only dump */
4408  if (dopt->dataOnly)
4409  return;
4410 
4411  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4412 
4413  query = createPQExpBuffer();
4414 
4415  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4416  fmtId(pubinfo->dobj.name));
4417  appendPQExpBuffer(query, " %s",
4418  fmtQualifiedDumpable(tbinfo));
4419 
4420  if (pubrinfo->pubrattrs)
4421  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4422 
4423  if (pubrinfo->pubrelqual)
4424  {
4425  /*
4426  * It's necessary to add parentheses around the expression because
4427  * pg_get_expr won't supply the parentheses for things like WHERE
4428  * TRUE.
4429  */
4430  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4431  }
4432  appendPQExpBufferStr(query, ";\n");
4433 
4434  /*
4435  * There is no point in creating a drop query as the drop is done by table
4436  * drop. (If you think to change this, see also _printTocEntry().)
4437  * Although this object doesn't really have ownership as such, set the
4438  * owner field anyway to ensure that the command is run by the correct
4439  * role at restore time.
4440  */
4441  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4442  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4443  ARCHIVE_OPTS(.tag = tag,
4444  .namespace = tbinfo->dobj.namespace->dobj.name,
4445  .owner = pubinfo->rolname,
4446  .description = "PUBLICATION TABLE",
4447  .section = SECTION_POST_DATA,
4448  .createStmt = query->data));
4449 
4450  /* These objects can't currently have comments or seclabels */
4451 
4452  free(tag);
4453  destroyPQExpBuffer(query);
4454 }
4455 
4456 /*
4457  * Is the currently connected user a superuser?
4458  */
4459 static bool
4461 {
4462  ArchiveHandle *AH = (ArchiveHandle *) fout;
4463  const char *val;
4464 
4465  val = PQparameterStatus(AH->connection, "is_superuser");
4466 
4467  if (val && strcmp(val, "on") == 0)
4468  return true;
4469 
4470  return false;
4471 }
4472 
4473 /*
4474  * getSubscriptions
4475  * get information about subscriptions
4476  */
4477 void
4479 {
4480  DumpOptions *dopt = fout->dopt;
4481  PQExpBuffer query;
4482  PGresult *res;
4483  SubscriptionInfo *subinfo;
4484  int i_tableoid;
4485  int i_oid;
4486  int i_subname;
4487  int i_subowner;
4488  int i_substream;
4489  int i_subtwophasestate;
4490  int i_subdisableonerr;
4491  int i_suborigin;
4492  int i_subconninfo;
4493  int i_subslotname;
4494  int i_subsynccommit;
4495  int i_subpublications;
4496  int i_subbinary;
4497  int i,
4498  ntups;
4499 
4500  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4501  return;
4502 
4503  if (!is_superuser(fout))
4504  {
4505  int n;
4506 
4507  res = ExecuteSqlQuery(fout,
4508  "SELECT count(*) FROM pg_subscription "
4509  "WHERE subdbid = (SELECT oid FROM pg_database"
4510  " WHERE datname = current_database())",
4511  PGRES_TUPLES_OK);
4512  n = atoi(PQgetvalue(res, 0, 0));
4513  if (n > 0)
4514  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4515  PQclear(res);
4516  return;
4517  }
4518 
4519  query = createPQExpBuffer();
4520 
4521  /* Get the subscriptions in current database. */
4522  appendPQExpBufferStr(query,
4523  "SELECT s.tableoid, s.oid, s.subname,\n"
4524  " s.subowner,\n"
4525  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4526  " s.subpublications,\n");
4527 
4528  if (fout->remoteVersion >= 140000)
4529  appendPQExpBufferStr(query, " s.subbinary,\n");
4530  else
4531  appendPQExpBufferStr(query, " false AS subbinary,\n");
4532 
4533  if (fout->remoteVersion >= 140000)
4534  appendPQExpBufferStr(query, " s.substream,\n");
4535  else
4536  appendPQExpBufferStr(query, " false AS substream,\n");
4537 
4538  if (fout->remoteVersion >= 150000)
4539  appendPQExpBufferStr(query,
4540  " s.subtwophasestate,\n"
4541  " s.subdisableonerr,\n");
4542  else
4543  appendPQExpBuffer(query,
4544  " '%c' AS subtwophasestate,\n"
4545  " false AS subdisableonerr,\n",
4547 
4548  if (fout->remoteVersion >= 160000)
4549  appendPQExpBufferStr(query, " s.suborigin\n");
4550  else
4551  appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
4552 
4553  appendPQExpBufferStr(query,
4554  "FROM pg_subscription s\n"
4555  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4556  " WHERE datname = current_database())");
4557 
4558  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4559 
4560  ntups = PQntuples(res);
4561 
4562  /*
4563  * Get subscription fields. We don't include subskiplsn in the dump as
4564  * after restoring the dump this value may no longer be relevant.
4565  */
4566  i_tableoid = PQfnumber(res, "tableoid");
4567  i_oid = PQfnumber(res, "oid");
4568  i_subname = PQfnumber(res, "subname");
4569  i_subowner = PQfnumber(res, "subowner");
4570  i_subconninfo = PQfnumber(res, "subconninfo");
4571  i_subslotname = PQfnumber(res, "subslotname");
4572  i_subsynccommit = PQfnumber(res, "subsynccommit");
4573  i_subpublications = PQfnumber(res, "subpublications");
4574  i_subbinary = PQfnumber(res, "subbinary");
4575  i_substream = PQfnumber(res, "substream");
4576  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4577  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4578  i_suborigin = PQfnumber(res, "suborigin");
4579 
4580  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4581 
4582  for (i = 0; i < ntups; i++)
4583  {
4584  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4585  subinfo[i].dobj.catId.tableoid =
4586  atooid(PQgetvalue(res, i, i_tableoid));
4587  subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4588  AssignDumpId(&subinfo[i].dobj);
4589  subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
4590  subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
4591  subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
4592  if (PQgetisnull(res, i, i_subslotname))
4593  subinfo[i].subslotname = NULL;
4594  else
4595  subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
4596  subinfo[i].subsynccommit =
4597  pg_strdup(PQgetvalue(res, i, i_subsynccommit));
4598  subinfo[i].subpublications =
4599  pg_strdup(PQgetvalue(res, i, i_subpublications));
4600  subinfo[i].subbinary =
4601  pg_strdup(PQgetvalue(res, i, i_subbinary));
4602  subinfo[i].substream =
4603  pg_strdup(PQgetvalue(res, i, i_substream));
4604  subinfo[i].subtwophasestate =
4605  pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
4606  subinfo[i].subdisableonerr =
4607  pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
4608  subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
4609 
4610  /* Decide whether we want to dump it */
4611  selectDumpableObject(&(subinfo[i].dobj), fout);
4612  }
4613  PQclear(res);
4614 
4615  destroyPQExpBuffer(query);
4616 }
4617 
4618 /*
4619  * dumpSubscription
4620  * dump the definition of the given subscription
4621  */
4622 static void
4624 {
4625  DumpOptions *dopt = fout->dopt;
4626  PQExpBuffer delq;
4627  PQExpBuffer query;
4628  PQExpBuffer publications;
4629  char *qsubname;
4630  char **pubnames = NULL;
4631  int npubnames = 0;
4632  int i;
4633  char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
4634 
4635  /* Do nothing in data-only dump */
4636  if (dopt->dataOnly)
4637  return;
4638 
4639  delq = createPQExpBuffer();
4640  query = createPQExpBuffer();
4641 
4642  qsubname = pg_strdup(fmtId(subinfo->dobj.name));
4643 
4644  appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
4645  qsubname);
4646 
4647  appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
4648  qsubname);
4649  appendStringLiteralAH(query, subinfo->subconninfo, fout);
4650 
4651  /* Build list of quoted publications and append them to query. */
4652  if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
4653  pg_fatal("could not parse %s array", "subpublications");
4654 
4655  publications = createPQExpBuffer();
4656  for (i = 0; i < npubnames; i++)
4657  {
4658  if (i > 0)
4659  appendPQExpBufferStr(publications, ", ");
4660 
4661  appendPQExpBufferStr(publications, fmtId(pubnames[i]));
4662  }
4663 
4664  appendPQExpBuffer(query, " PUBLICATION %s WITH (connect = false, slot_name = ", publications->data);
4665  if (subinfo->subslotname)
4666  appendStringLiteralAH(query, subinfo->subslotname, fout);
4667  else
4668  appendPQExpBufferStr(query, "NONE");
4669 
4670  if (strcmp(subinfo->subbinary, "t") == 0)
4671  appendPQExpBufferStr(query, ", binary = true");
4672 
4673  if (strcmp(subinfo->substream, "f") != 0)
4674  appendPQExpBufferStr(query, ", streaming = on");
4675 
4676  if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
4677  appendPQExpBufferStr(query, ", two_phase = on");
4678 
4679  if (strcmp(subinfo->subdisableonerr, "t") == 0)
4680  appendPQExpBufferStr(query, ", disable_on_error = true");
4681 
4682  if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0)
4683  appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin);
4684 
4685  if (strcmp(subinfo->subsynccommit, "off") != 0)
4686  appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
4687 
4688  appendPQExpBufferStr(query, ");\n");
4689 
4690  if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4691  ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
4692  ARCHIVE_OPTS(.tag = subinfo->dobj.name,
4693  .owner = subinfo->rolname,
4694  .description = "SUBSCRIPTION",
4695  .section = SECTION_POST_DATA,
4696  .createStmt = query->data,
4697  .dropStmt = delq->data));
4698 
4699  if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4700  dumpComment(fout, "SUBSCRIPTION", qsubname,
4701  NULL, subinfo->rolname,
4702  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4703 
4704  if (subinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4705  dumpSecLabel(fout, "SUBSCRIPTION", qsubname,
4706  NULL, subinfo->rolname,
4707  subinfo->dobj.catId, 0, subinfo->dobj.dumpId);
4708 
4709  destroyPQExpBuffer(publications);
4710  free(pubnames);
4711 
4712  destroyPQExpBuffer(delq);
4713  destroyPQExpBuffer(query);
4714  free(qsubname);
4715 }
4716 
4717 /*
4718  * Given a "create query", append as many ALTER ... DEPENDS ON EXTENSION as
4719  * the object needs.
4720  */
4721 static void
4723  PQExpBuffer create,
4724  const DumpableObject *dobj,
4725  const char *catalog,
4726  const char *keyword,
4727  const char *objname)
4728 {
4729  if (dobj->depends_on_ext)
4730  {
4731  char *nm;
4732  PGresult *res;
4733  PQExpBuffer query;
4734  int ntups;
4735  int i_extname;
4736  int i;
4737 
4738  /* dodge fmtId() non-reentrancy */
4739  nm = pg_strdup(objname);
4740 
4741  query = createPQExpBuffer();
4742  appendPQExpBuffer(query,
4743  "SELECT e.extname "
4744  "FROM pg_catalog.pg_depend d, pg_catalog.pg_extension e "
4745  "WHERE d.refobjid = e.oid AND classid = '%s'::pg_catalog.regclass "
4746  "AND objid = '%u'::pg_catalog.oid AND deptype = 'x' "
4747  "AND refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass",
4748  catalog,
4749  dobj->catId.oid);
4750  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4751  ntups = PQntuples(res);
4752  i_extname = PQfnumber(res, "extname");
4753  for (i = 0; i < ntups; i++)
4754  {
4755  appendPQExpBuffer(create, "ALTER %s %s DEPENDS ON EXTENSION %s;\n",
4756  keyword, nm,
4757  fmtId(PQgetvalue(res, i, i_extname)));
4758  }
4759 
4760  PQclear(res);
4761  destroyPQExpBuffer(query);
4762  pg_free(nm);
4763  }
4764 }
4765 
4766 static Oid
4768 {
4769  /*
4770  * If the old version didn't assign an array type, but the new version
4771  * does, we must select an unused type OID to assign. This currently only
4772  * happens for domains, when upgrading pre-v11 to v11 and up.
4773  *
4774  * Note: local state here is kind of ugly, but we must have some, since we
4775  * mustn't choose the same unused OID more than once.
4776  */
4777  static Oid next_possible_free_oid = FirstNormalObjectId;
4778  PGresult *res;
4779  bool is_dup;
4780 
4781  do
4782  {
4783  ++next_possible_free_oid;
4784  printfPQExpBuffer(upgrade_query,
4785  "SELECT EXISTS(SELECT 1 "
4786  "FROM pg_catalog.pg_type "
4787  "WHERE oid = '%u'::pg_catalog.oid);",
4788  next_possible_free_oid);
4789  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4790  is_dup = (PQgetvalue(res, 0, 0)[0] == 't');
4791  PQclear(res);
4792  } while (is_dup);
4793 
4794  return next_possible_free_oid;
4795 }
4796 
4797 static void
4799  PQExpBuffer upgrade_buffer,
4800  Oid pg_type_oid,
4801  bool force_array_type,
4802  bool include_multirange_type)
4803 {
4804  PQExpBuffer upgrade_query = createPQExpBuffer();
4805  PGresult *res;
4806  Oid pg_type_array_oid;
4807  Oid pg_type_multirange_oid;
4808  Oid pg_type_multirange_array_oid;
4809 
4810  appendPQExpBufferStr(upgrade_buffer, "\n-- For binary upgrade, must preserve pg_type oid\n");
4811  appendPQExpBuffer(upgrade_buffer,
4812  "SELECT pg_catalog.binary_upgrade_set_next_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4813  pg_type_oid);
4814 
4815  appendPQExpBuffer(upgrade_query,
4816  "SELECT typarray "
4817  "FROM pg_catalog.pg_type "
4818  "WHERE oid = '%u'::pg_catalog.oid;",
4819  pg_type_oid);
4820 
4821  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4822 
4823  pg_type_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4824 
4825  PQclear(res);
4826 
4827  if (!OidIsValid(pg_type_array_oid) && force_array_type)
4828  pg_type_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4829 
4830  if (OidIsValid(pg_type_array_oid))
4831  {
4832  appendPQExpBufferStr(upgrade_buffer,
4833  "\n-- For binary upgrade, must preserve pg_type array oid\n");
4834  appendPQExpBuffer(upgrade_buffer,
4835  "SELECT pg_catalog.binary_upgrade_set_next_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4836  pg_type_array_oid);
4837  }
4838 
4839  /*
4840  * Pre-set the multirange type oid and its own array type oid.
4841  */
4842  if (include_multirange_type)
4843  {
4844  if (fout->remoteVersion >= 140000)
4845  {
4846  printfPQExpBuffer(upgrade_query,
4847  "SELECT t.oid, t.typarray "
4848  "FROM pg_catalog.pg_type t "
4849  "JOIN pg_catalog.pg_range r "
4850  "ON t.oid = r.rngmultitypid "
4851  "WHERE r.rngtypid = '%u'::pg_catalog.oid;",
4852  pg_type_oid);
4853 
4854  res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4855 
4856  pg_type_multirange_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "oid")));
4857  pg_type_multirange_array_oid = atooid(PQgetvalue(res, 0, PQfnumber(res, "typarray")));
4858 
4859  PQclear(res);
4860  }
4861  else
4862  {
4863  pg_type_multirange_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4864  pg_type_multirange_array_oid = get_next_possible_free_pg_type_oid(fout, upgrade_query);
4865  }
4866 
4867  appendPQExpBufferStr(upgrade_buffer,
4868  "\n-- For binary upgrade, must preserve multirange pg_type oid\n");
4869  appendPQExpBuffer(upgrade_buffer,
4870  "SELECT pg_catalog.binary_upgrade_set_next_multirange_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4871  pg_type_multirange_oid);
4872  appendPQExpBufferStr(upgrade_buffer,
4873  "\n-- For binary upgrade, must preserve multirange pg_type array oid\n");
4874  appendPQExpBuffer(upgrade_buffer,
4875  "SELECT pg_catalog.binary_upgrade_set_next_multirange_array_pg_type_oid('%u'::pg_catalog.oid);\n\n",
4876  pg_type_multirange_array_oid);
4877  }
4878 
4879  destroyPQExpBuffer(upgrade_query);
4880 }
4881 
4882 static void
4884  PQExpBuffer upgrade_buffer,
4885  const TableInfo *tbinfo)
4886 {
4887  Oid pg_type_oid = tbinfo->reltype;
4888 
4889  if (OidIsValid(pg_type_oid))
4890  binary_upgrade_set_type_oids_by_type_oid(fout, upgrade_buffer,
4891  pg_type_oid, false, false);
4892 }
4893 
4894 static void
4896  PQExpBuffer upgrade_buffer, Oid pg_class_oid,
4897  bool is_index)
4898 {
4899  PQExpBuffer upgrade_query = createPQExpBuffer();
4900  PGresult *upgrade_res;
4901  RelFileNumber relfilenumber;
4902  Oid toast_oid;
4903  RelFileNumber toast_relfilenumber;
4904  char relkind;
4905  Oid toast_index_oid;
4906  RelFileNumber toast_index_relfilenumber;
4907 
4908  /*
4909  * Preserve the OID and relfilenumber of the table, table's index, table's
4910  * toast table and toast table's index if any.
4911  *
4912  * One complexity is that the current table definition might not require
4913  * the creation of a TOAST table, but the old database might have a TOAST
4914  * table that was created earlier, before some wide columns were dropped.
4915  * By setting the TOAST oid we force creation of the TOAST heap and index
4916  * by the new backend, so we can copy the files during binary upgrade
4917  * without worrying about this case.
4918  */
4919  appendPQExpBuffer(upgrade_query,
4920  "SELECT c.relkind, c.relfilenode, c.reltoastrelid, ct.relfilenode AS toast_relfilenode, i.indexrelid, cti.relfilenode AS toast_index_relfilenode "
4921  "FROM pg_catalog.pg_class c LEFT JOIN "
4922  "pg_catalog.pg_index i ON (c.reltoastrelid = i.indrelid AND i.indisvalid) "
4923  "LEFT JOIN pg_catalog.pg_class ct ON (c.reltoastrelid = ct.oid) "
4924  "LEFT JOIN pg_catalog.pg_class AS cti ON (i.indexrelid = cti.oid) "
4925  "WHERE c.oid = '%u'::pg_catalog.oid;",
4926  pg_class_oid);
4927 
4928  upgrade_res = ExecuteSqlQueryForSingleRow(fout, upgrade_query->data);
4929 
4930  relkind = *PQgetvalue(upgrade_res, 0, PQfnumber(upgrade_res, "relkind"));
4931 
4932  relfilenumber = atooid(PQgetvalue(upgrade_res, 0,
4933  PQfnumber(upgrade_res, "relfilenode")));
4934  toast_oid = atooid(PQgetvalue(upgrade_res, 0,
4935  PQfnumber(upgrade_res, "reltoastrelid")));
4936  toast_relfilenumber = atooid(PQgetvalue(upgrade_res, 0,
4937  PQfnumber(upgrade_res, "toast_relfilenode")));
4938  toast_index_oid = atooid(PQgetvalue(upgrade_res, 0,
4939  PQfnumber(upgrade_res, "indexrelid")));
4940  toast_index_relfilenumber = atooid(PQgetvalue(upgrade_res, 0,
4941  PQfnumber(upgrade_res, "toast_index_relfilenode")));
4942 
4943  appendPQExpBufferStr(upgrade_buffer,
4944  "\n-- For binary upgrade, must preserve pg_class oids and relfilenodes\n");
4945 
4946  if (!is_index)
4947  {
4948  appendPQExpBuffer(upgrade_buffer,
4949  "SELECT pg_catalog.binary_upgrade_set_next_heap_pg_class_oid('%u'::pg_catalog.oid);\n",
4950  pg_class_oid);
4951 
4952  /*
4953  * Not every relation has storage. Also, in a pre-v12 database,
4954  * partitioned tables have a relfilenumber, which should not be
4955  * preserved when upgrading.
4956  */
4957  if (RelFileNumberIsValid(relfilenumber) && relkind != RELKIND_PARTITIONED_TABLE)
4958  appendPQExpBuffer(upgrade_buffer,
4959  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
4960  relfilenumber);
4961 
4962  /*
4963  * In a pre-v12 database, partitioned tables might be marked as having
4964  * toast tables, but we should ignore them if so.
4965  */
4966  if (OidIsValid(toast_oid) &&
4967  relkind != RELKIND_PARTITIONED_TABLE)
4968  {
4969  appendPQExpBuffer(upgrade_buffer,
4970  "SELECT pg_catalog.binary_upgrade_set_next_toast_pg_class_oid('%u'::pg_catalog.oid);\n",
4971  toast_oid);
4972  appendPQExpBuffer(upgrade_buffer,
4973  "SELECT pg_catalog.binary_upgrade_set_next_toast_relfilenode('%u'::pg_catalog.oid);\n",
4974  toast_relfilenumber);
4975 
4976  /* every toast table has an index */
4977  appendPQExpBuffer(upgrade_buffer,
4978  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4979  toast_index_oid);
4980  appendPQExpBuffer(upgrade_buffer,
4981  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
4982  toast_index_relfilenumber);
4983  }
4984 
4985  PQclear(upgrade_res);
4986  }
4987  else
4988  {
4989  /* Preserve the OID and relfilenumber of the index */
4990  appendPQExpBuffer(upgrade_buffer,
4991  "SELECT pg_catalog.binary_upgrade_set_next_index_pg_class_oid('%u'::pg_catalog.oid);\n",
4992  pg_class_oid);
4993  appendPQExpBuffer(upgrade_buffer,
4994  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
4995  relfilenumber);
4996  }
4997 
4998  appendPQExpBufferChar(upgrade_buffer, '\n');
4999 
5000