PostgreSQL Source Code  git master
pg_dump.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_dump.c
4  * pg_dump is a utility for dumping out a postgres database
5  * into a script file.
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * pg_dump will read the system catalogs in a database and dump out a
11  * script that reproduces the schema in terms of SQL that is understood
12  * by PostgreSQL
13  *
14  * Note that pg_dump runs in a transaction-snapshot mode transaction,
15  * so it sees a consistent snapshot of the database including system
16  * catalogs. However, it relies in part on various specialized backend
17  * functions like pg_get_indexdef(), and those things tend to look at
18  * the currently committed state. So it is possible to get 'cache
19  * lookup failed' error if someone performs DDL changes while a dump is
20  * happening. The window for this sort of thing is from the acquisition
21  * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22  * AccessShareLock on every table it intends to dump). It isn't very large,
23  * but it can happen.
24  *
25  * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26  *
27  * IDENTIFICATION
28  * src/bin/pg_dump/pg_dump.c
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include "postgres_fe.h"
33 
34 #include <unistd.h>
35 #include <ctype.h>
36 #include <limits.h>
37 #ifdef HAVE_TERMIOS_H
38 #include <termios.h>
39 #endif
40 
41 #include "access/attnum.h"
42 #include "access/sysattr.h"
43 #include "access/transam.h"
44 #include "catalog/pg_aggregate_d.h"
45 #include "catalog/pg_am_d.h"
46 #include "catalog/pg_attribute_d.h"
47 #include "catalog/pg_authid_d.h"
48 #include "catalog/pg_cast_d.h"
49 #include "catalog/pg_class_d.h"
50 #include "catalog/pg_default_acl_d.h"
51 #include "catalog/pg_largeobject_d.h"
52 #include "catalog/pg_largeobject_metadata_d.h"
53 #include "catalog/pg_proc_d.h"
55 #include "catalog/pg_trigger_d.h"
56 #include "catalog/pg_type_d.h"
57 #include "common/connect.h"
58 #include "common/int.h"
59 #include "common/relpath.h"
60 #include "compress_io.h"
61 #include "dumputils.h"
62 #include "fe_utils/option_utils.h"
63 #include "fe_utils/string_utils.h"
64 #include "filter.h"
65 #include "getopt_long.h"
66 #include "libpq/libpq-fs.h"
67 #include "parallel.h"
68 #include "pg_backup_db.h"
69 #include "pg_backup_utils.h"
70 #include "pg_dump.h"
71 #include "storage/block.h"
72 
73 typedef struct
74 {
75  Oid roleoid; /* role's OID */
76  const char *rolename; /* role's name */
77 } RoleNameItem;
78 
79 typedef struct
80 {
81  const char *descr; /* comment for an object */
82  Oid classoid; /* object class (catalog OID) */
83  Oid objoid; /* object OID */
84  int objsubid; /* subobject (table column #) */
85 } CommentItem;
86 
87 typedef struct
88 {
89  const char *provider; /* label provider of this security label */
90  const char *label; /* security label for an object */
91  Oid classoid; /* object class (catalog OID) */
92  Oid objoid; /* object OID */
93  int objsubid; /* subobject (table column #) */
94 } SecLabelItem;
95 
96 typedef struct
97 {
98  Oid oid; /* object OID */
99  char relkind; /* object kind */
100  RelFileNumber relfilenumber; /* object filenode */
101  Oid toast_oid; /* toast table OID */
102  RelFileNumber toast_relfilenumber; /* toast table filenode */
103  Oid toast_index_oid; /* toast table index OID */
104  RelFileNumber toast_index_relfilenumber; /* toast table index filenode */
106 
107 /* sequence types */
108 typedef enum SeqType
109 {
114 
115 static const char *const SeqTypeNames[] =
116 {
117  [SEQTYPE_SMALLINT] = "smallint",
118  [SEQTYPE_INTEGER] = "integer",
119  [SEQTYPE_BIGINT] = "bigint",
120 };
121 
123  "array length mismatch");
124 
125 typedef struct
126 {
127  Oid oid; /* sequence OID */
128  SeqType seqtype; /* data type of sequence */
129  bool cycled; /* whether sequence cycles */
130  int64 minv; /* minimum value */
131  int64 maxv; /* maximum value */
132  int64 startv; /* start value */
133  int64 incby; /* increment value */
134  int64 cache; /* cache size */
135  int64 last_value; /* last value of sequence */
136  bool is_called; /* whether nextval advances before returning */
137 } SequenceItem;
138 
139 typedef enum OidOptions
140 {
145 
146 /* global decls */
147 static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
148 
149 static Oid g_last_builtin_oid; /* value of the last builtin oid */
150 
151 /* The specified names/patterns should to match at least one entity */
152 static int strict_names = 0;
153 
155 
156 /*
157  * Object inclusion/exclusion lists
158  *
159  * The string lists record the patterns given by command-line switches,
160  * which we then convert to lists of OIDs of matching objects.
161  */
163 static SimpleOidList schema_include_oids = {NULL, NULL};
165 static SimpleOidList schema_exclude_oids = {NULL, NULL};
166 
169 static SimpleOidList table_include_oids = {NULL, NULL};
172 static SimpleOidList table_exclude_oids = {NULL, NULL};
175 static SimpleOidList tabledata_exclude_oids = {NULL, NULL};
176 
179 
181 static SimpleOidList extension_include_oids = {NULL, NULL};
182 
184 static SimpleOidList extension_exclude_oids = {NULL, NULL};
185 
186 static const CatalogId nilCatalogId = {0, 0};
187 
188 /* override for standard extra_float_digits setting */
189 static bool have_extra_float_digits = false;
191 
192 /* sorted table of role names */
193 static RoleNameItem *rolenames = NULL;
194 static int nrolenames = 0;
195 
196 /* sorted table of comments */
197 static CommentItem *comments = NULL;
198 static int ncomments = 0;
199 
200 /* sorted table of security labels */
201 static SecLabelItem *seclabels = NULL;
202 static int nseclabels = 0;
203 
204 /* sorted table of pg_class information for binary upgrade */
206 static int nbinaryUpgradeClassOids = 0;
207 
208 /* sorted table of sequences */
209 static SequenceItem *sequences = NULL;
210 static int nsequences = 0;
211 
212 /*
213  * The default number of rows per INSERT when
214  * --inserts is specified without --rows-per-insert
215  */
216 #define DUMP_DEFAULT_ROWS_PER_INSERT 1
217 
218 /*
219  * Maximum number of large objects to group into a single ArchiveEntry.
220  * At some point we might want to make this user-controllable, but for now
221  * a hard-wired setting will suffice.
222  */
223 #define MAX_BLOBS_PER_ARCHIVE_ENTRY 1000
224 
225 /*
226  * Macro for producing quoted, schema-qualified name of a dumpable object.
227  */
228 #define fmtQualifiedDumpable(obj) \
229  fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
230  (obj)->dobj.name)
231 
232 static void help(const char *progname);
233 static void setup_connection(Archive *AH,
234  const char *dumpencoding, const char *dumpsnapshot,
235  char *use_role);
237 static void expand_schema_name_patterns(Archive *fout,
238  SimpleStringList *patterns,
239  SimpleOidList *oids,
240  bool strict_names);
241 static void expand_extension_name_patterns(Archive *fout,
242  SimpleStringList *patterns,
243  SimpleOidList *oids,
244  bool strict_names);
246  SimpleStringList *patterns,
247  SimpleOidList *oids);
248 static void expand_table_name_patterns(Archive *fout,
249  SimpleStringList *patterns,
250  SimpleOidList *oids,
251  bool strict_names,
252  bool with_child_tables);
253 static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
254  const char *pattern);
255 
256 static NamespaceInfo *findNamespace(Oid nsoid);
257 static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
258 static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
259 static const char *getRoleName(const char *roleoid_str);
260 static void collectRoleNames(Archive *fout);
261 static void getAdditionalACLs(Archive *fout);
262 static void dumpCommentExtended(Archive *fout, const char *type,
263  const char *name, const char *namespace,
264  const char *owner, CatalogId catalogId,
265  int subid, DumpId dumpId,
266  const char *initdb_comment);
267 static inline void dumpComment(Archive *fout, const char *type,
268  const char *name, const char *namespace,
269  const char *owner, CatalogId catalogId,
270  int subid, DumpId dumpId);
271 static int findComments(Oid classoid, Oid objoid, CommentItem **items);
272 static void collectComments(Archive *fout);
273 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
274  const char *namespace, const char *owner,
275  CatalogId catalogId, int subid, DumpId dumpId);
276 static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
277 static void collectSecLabels(Archive *fout);
278 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
279 static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
280 static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
281 static void dumpType(Archive *fout, const TypeInfo *tyinfo);
282 static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
283 static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
284 static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
285 static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
286 static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
287 static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
288 static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
289  PGresult *res);
290 static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
291 static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
292 static void dumpFunc(Archive *fout, const FuncInfo *finfo);
293 static void dumpCast(Archive *fout, const CastInfo *cast);
294 static void dumpTransform(Archive *fout, const TransformInfo *transform);
295 static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
296 static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
297 static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
298 static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
299 static void dumpCollation(Archive *fout, const CollInfo *collinfo);
300 static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
301 static void dumpRule(Archive *fout, const RuleInfo *rinfo);
302 static void dumpAgg(Archive *fout, const AggInfo *agginfo);
303 static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
304 static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
305 static void dumpTable(Archive *fout, const TableInfo *tbinfo);
306 static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
307 static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
308 static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
309 static void collectSequences(Archive *fout);
310 static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
311 static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
312 static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
313 static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
314 static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
315 static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
316 static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
317 static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
318 static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
319 static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
320 static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
321 static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
322 static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
323 static void dumpUserMappings(Archive *fout,
324  const char *servername, const char *namespace,
325  const char *owner, CatalogId catalogId, DumpId dumpId);
326 static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
327 
328 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
329  const char *type, const char *name, const char *subname,
330  const char *nspname, const char *tag, const char *owner,
331  const DumpableAcl *dacl);
332 
333 static void getDependencies(Archive *fout);
334 static void BuildArchiveDependencies(Archive *fout);
335 static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
336  DumpId **dependencies, int *nDeps, int *allocDeps);
337 
339 static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
340  DumpableObject *boundaryObjs);
341 
342 static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
343 static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
344 static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
345 static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
346 static void buildMatViewRefreshDependencies(Archive *fout);
347 static void getTableDataFKConstraints(void);
348 static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
349  bool is_agg);
350 static char *format_function_signature(Archive *fout,
351  const FuncInfo *finfo, bool honor_quotes);
352 static char *convertRegProcReference(const char *proc);
353 static char *getFormattedOperatorName(const char *oproid);
354 static char *convertTSFunction(Archive *fout, Oid funcOid);
355 static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
356 static void getLOs(Archive *fout);
357 static void dumpLO(Archive *fout, const LoInfo *loinfo);
358 static int dumpLOs(Archive *fout, const void *arg);
359 static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
360 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
361 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
362 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
363 static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
364 static void dumpDatabase(Archive *fout);
365 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
366  const char *dbname, Oid dboid);
367 static void dumpEncoding(Archive *AH);
368 static void dumpStdStrings(Archive *AH);
369 static void dumpSearchPath(Archive *AH);
371  PQExpBuffer upgrade_buffer,
372  Oid pg_type_oid,
373  bool force_array_type,
374  bool include_multirange_type);
376  PQExpBuffer upgrade_buffer,
377  const TableInfo *tbinfo);
378 static void collectBinaryUpgradeClassOids(Archive *fout);
379 static void binary_upgrade_set_pg_class_oids(Archive *fout,
380  PQExpBuffer upgrade_buffer,
381  Oid pg_class_oid);
382 static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
383  const DumpableObject *dobj,
384  const char *objtype,
385  const char *objname,
386  const char *objnamespace);
387 static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
388 static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
389 static bool nonemptyReloptions(const char *reloptions);
390 static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
391  const char *prefix, Archive *fout);
392 static char *get_synchronized_snapshot(Archive *fout);
393 static void set_restrict_relation_kind(Archive *AH, const char *value);
394 static void setupDumpWorker(Archive *AH);
395 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
396 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
397 static void read_dump_filters(const char *filename, DumpOptions *dopt);
398 
399 
400 int
401 main(int argc, char **argv)
402 {
403  int c;
404  const char *filename = NULL;
405  const char *format = "p";
406  TableInfo *tblinfo;
407  int numTables;
408  DumpableObject **dobjs;
409  int numObjs;
410  DumpableObject *boundaryObjs;
411  int i;
412  int optindex;
413  RestoreOptions *ropt;
414  Archive *fout; /* the script file */
415  bool g_verbose = false;
416  const char *dumpencoding = NULL;
417  const char *dumpsnapshot = NULL;
418  char *use_role = NULL;
419  int numWorkers = 1;
420  int plainText = 0;
421  ArchiveFormat archiveFormat = archUnknown;
422  ArchiveMode archiveMode;
423  pg_compress_specification compression_spec = {0};
424  char *compression_detail = NULL;
425  char *compression_algorithm_str = "none";
426  char *error_detail = NULL;
427  bool user_compression_defined = false;
429 
430  static DumpOptions dopt;
431 
432  static struct option long_options[] = {
433  {"data-only", no_argument, NULL, 'a'},
434  {"blobs", no_argument, NULL, 'b'},
435  {"large-objects", no_argument, NULL, 'b'},
436  {"no-blobs", no_argument, NULL, 'B'},
437  {"no-large-objects", no_argument, NULL, 'B'},
438  {"clean", no_argument, NULL, 'c'},
439  {"create", no_argument, NULL, 'C'},
440  {"dbname", required_argument, NULL, 'd'},
441  {"extension", required_argument, NULL, 'e'},
442  {"file", required_argument, NULL, 'f'},
443  {"format", required_argument, NULL, 'F'},
444  {"host", required_argument, NULL, 'h'},
445  {"jobs", 1, NULL, 'j'},
446  {"no-reconnect", no_argument, NULL, 'R'},
447  {"no-owner", no_argument, NULL, 'O'},
448  {"port", required_argument, NULL, 'p'},
449  {"schema", required_argument, NULL, 'n'},
450  {"exclude-schema", required_argument, NULL, 'N'},
451  {"schema-only", no_argument, NULL, 's'},
452  {"superuser", required_argument, NULL, 'S'},
453  {"table", required_argument, NULL, 't'},
454  {"exclude-table", required_argument, NULL, 'T'},
455  {"no-password", no_argument, NULL, 'w'},
456  {"password", no_argument, NULL, 'W'},
457  {"username", required_argument, NULL, 'U'},
458  {"verbose", no_argument, NULL, 'v'},
459  {"no-privileges", no_argument, NULL, 'x'},
460  {"no-acl", no_argument, NULL, 'x'},
461  {"compress", required_argument, NULL, 'Z'},
462  {"encoding", required_argument, NULL, 'E'},
463  {"help", no_argument, NULL, '?'},
464  {"version", no_argument, NULL, 'V'},
465 
466  /*
467  * the following options don't have an equivalent short option letter
468  */
469  {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
470  {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
471  {"column-inserts", no_argument, &dopt.column_inserts, 1},
472  {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
473  {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
474  {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
475  {"exclude-table-data", required_argument, NULL, 4},
476  {"extra-float-digits", required_argument, NULL, 8},
477  {"if-exists", no_argument, &dopt.if_exists, 1},
478  {"inserts", no_argument, NULL, 9},
479  {"lock-wait-timeout", required_argument, NULL, 2},
480  {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
481  {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
482  {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
483  {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
484  {"role", required_argument, NULL, 3},
485  {"section", required_argument, NULL, 5},
486  {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
487  {"snapshot", required_argument, NULL, 6},
488  {"strict-names", no_argument, &strict_names, 1},
489  {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
490  {"no-comments", no_argument, &dopt.no_comments, 1},
491  {"no-publications", no_argument, &dopt.no_publications, 1},
492  {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
493  {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
494  {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
495  {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
496  {"no-sync", no_argument, NULL, 7},
497  {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
498  {"rows-per-insert", required_argument, NULL, 10},
499  {"include-foreign-data", required_argument, NULL, 11},
500  {"table-and-children", required_argument, NULL, 12},
501  {"exclude-table-and-children", required_argument, NULL, 13},
502  {"exclude-table-data-and-children", required_argument, NULL, 14},
503  {"sync-method", required_argument, NULL, 15},
504  {"filter", required_argument, NULL, 16},
505  {"exclude-extension", required_argument, NULL, 17},
506 
507  {NULL, 0, NULL, 0}
508  };
509 
510  pg_logging_init(argv[0]);
512  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
513 
514  /*
515  * Initialize what we need for parallel execution, especially for thread
516  * support on Windows.
517  */
519 
520  progname = get_progname(argv[0]);
521 
522  if (argc > 1)
523  {
524  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
525  {
526  help(progname);
527  exit_nicely(0);
528  }
529  if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
530  {
531  puts("pg_dump (PostgreSQL) " PG_VERSION);
532  exit_nicely(0);
533  }
534  }
535 
536  InitDumpOptions(&dopt);
537 
538  while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
539  long_options, &optindex)) != -1)
540  {
541  switch (c)
542  {
543  case 'a': /* Dump data only */
544  dopt.dataOnly = true;
545  break;
546 
547  case 'b': /* Dump LOs */
548  dopt.outputLOs = true;
549  break;
550 
551  case 'B': /* Don't dump LOs */
552  dopt.dontOutputLOs = true;
553  break;
554 
555  case 'c': /* clean (i.e., drop) schema prior to create */
556  dopt.outputClean = 1;
557  break;
558 
559  case 'C': /* Create DB */
560  dopt.outputCreateDB = 1;
561  break;
562 
563  case 'd': /* database name */
564  dopt.cparams.dbname = pg_strdup(optarg);
565  break;
566 
567  case 'e': /* include extension(s) */
569  dopt.include_everything = false;
570  break;
571 
572  case 'E': /* Dump encoding */
573  dumpencoding = pg_strdup(optarg);
574  break;
575 
576  case 'f':
578  break;
579 
580  case 'F':
582  break;
583 
584  case 'h': /* server host */
585  dopt.cparams.pghost = pg_strdup(optarg);
586  break;
587 
588  case 'j': /* number of dump jobs */
589  if (!option_parse_int(optarg, "-j/--jobs", 1,
590  PG_MAX_JOBS,
591  &numWorkers))
592  exit_nicely(1);
593  break;
594 
595  case 'n': /* include schema(s) */
597  dopt.include_everything = false;
598  break;
599 
600  case 'N': /* exclude schema(s) */
602  break;
603 
604  case 'O': /* Don't reconnect to match owner */
605  dopt.outputNoOwner = 1;
606  break;
607 
608  case 'p': /* server port */
609  dopt.cparams.pgport = pg_strdup(optarg);
610  break;
611 
612  case 'R':
613  /* no-op, still accepted for backwards compatibility */
614  break;
615 
616  case 's': /* dump schema only */
617  dopt.schemaOnly = true;
618  break;
619 
620  case 'S': /* Username for superuser in plain text output */
622  break;
623 
624  case 't': /* include table(s) */
626  dopt.include_everything = false;
627  break;
628 
629  case 'T': /* exclude table(s) */
631  break;
632 
633  case 'U':
635  break;
636 
637  case 'v': /* verbose */
638  g_verbose = true;
640  break;
641 
642  case 'w':
644  break;
645 
646  case 'W':
648  break;
649 
650  case 'x': /* skip ACL dump */
651  dopt.aclsSkip = true;
652  break;
653 
654  case 'Z': /* Compression */
655  parse_compress_options(optarg, &compression_algorithm_str,
656  &compression_detail);
657  user_compression_defined = true;
658  break;
659 
660  case 0:
661  /* This covers the long options. */
662  break;
663 
664  case 2: /* lock-wait-timeout */
666  break;
667 
668  case 3: /* SET ROLE */
669  use_role = pg_strdup(optarg);
670  break;
671 
672  case 4: /* exclude table(s) data */
674  break;
675 
676  case 5: /* section */
678  break;
679 
680  case 6: /* snapshot */
681  dumpsnapshot = pg_strdup(optarg);
682  break;
683 
684  case 7: /* no-sync */
685  dosync = false;
686  break;
687 
688  case 8:
690  if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
692  exit_nicely(1);
693  break;
694 
695  case 9: /* inserts */
696 
697  /*
698  * dump_inserts also stores --rows-per-insert, careful not to
699  * overwrite that.
700  */
701  if (dopt.dump_inserts == 0)
703  break;
704 
705  case 10: /* rows per insert */
706  if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
707  &dopt.dump_inserts))
708  exit_nicely(1);
709  break;
710 
711  case 11: /* include foreign data */
713  optarg);
714  break;
715 
716  case 12: /* include table(s) and their children */
718  optarg);
719  dopt.include_everything = false;
720  break;
721 
722  case 13: /* exclude table(s) and their children */
724  optarg);
725  break;
726 
727  case 14: /* exclude data of table(s) and children */
729  optarg);
730  break;
731 
732  case 15:
734  exit_nicely(1);
735  break;
736 
737  case 16: /* read object filters from file */
738  read_dump_filters(optarg, &dopt);
739  break;
740 
741  case 17: /* exclude extension(s) */
743  optarg);
744  break;
745 
746  default:
747  /* getopt_long already emitted a complaint */
748  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
749  exit_nicely(1);
750  }
751  }
752 
753  /*
754  * Non-option argument specifies database name as long as it wasn't
755  * already specified with -d / --dbname
756  */
757  if (optind < argc && dopt.cparams.dbname == NULL)
758  dopt.cparams.dbname = argv[optind++];
759 
760  /* Complain if any arguments remain */
761  if (optind < argc)
762  {
763  pg_log_error("too many command-line arguments (first is \"%s\")",
764  argv[optind]);
765  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
766  exit_nicely(1);
767  }
768 
769  /* --column-inserts implies --inserts */
770  if (dopt.column_inserts && dopt.dump_inserts == 0)
772 
773  /*
774  * Binary upgrade mode implies dumping sequence data even in schema-only
775  * mode. This is not exposed as a separate option, but kept separate
776  * internally for clarity.
777  */
778  if (dopt.binary_upgrade)
779  dopt.sequence_data = 1;
780 
781  if (dopt.dataOnly && dopt.schemaOnly)
782  pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
783 
785  pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
786 
787  if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
788  pg_fatal("option --include-foreign-data is not supported with parallel backup");
789 
790  if (dopt.dataOnly && dopt.outputClean)
791  pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
792 
793  if (dopt.if_exists && !dopt.outputClean)
794  pg_fatal("option --if-exists requires option -c/--clean");
795 
796  /*
797  * --inserts are already implied above if --column-inserts or
798  * --rows-per-insert were specified.
799  */
800  if (dopt.do_nothing && dopt.dump_inserts == 0)
801  pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
802 
803  /* Identify archive format to emit */
804  archiveFormat = parseArchiveFormat(format, &archiveMode);
805 
806  /* archiveFormat specific setup */
807  if (archiveFormat == archNull)
808  plainText = 1;
809 
810  /*
811  * Custom and directory formats are compressed by default with gzip when
812  * available, not the others. If gzip is not available, no compression is
813  * done by default.
814  */
815  if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
816  !user_compression_defined)
817  {
818 #ifdef HAVE_LIBZ
819  compression_algorithm_str = "gzip";
820 #else
821  compression_algorithm_str = "none";
822 #endif
823  }
824 
825  /*
826  * Compression options
827  */
828  if (!parse_compress_algorithm(compression_algorithm_str,
830  pg_fatal("unrecognized compression algorithm: \"%s\"",
831  compression_algorithm_str);
832 
834  &compression_spec);
835  error_detail = validate_compress_specification(&compression_spec);
836  if (error_detail != NULL)
837  pg_fatal("invalid compression specification: %s",
838  error_detail);
839 
840  error_detail = supports_compression(compression_spec);
841  if (error_detail != NULL)
842  pg_fatal("%s", error_detail);
843 
844  /*
845  * Disable support for zstd workers for now - these are based on
846  * threading, and it's unclear how it interacts with parallel dumps on
847  * platforms where that relies on threads too (e.g. Windows).
848  */
849  if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
850  pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
851  "workers");
852 
853  /*
854  * If emitting an archive format, we always want to emit a DATABASE item,
855  * in case --create is specified at pg_restore time.
856  */
857  if (!plainText)
858  dopt.outputCreateDB = 1;
859 
860  /* Parallel backup only in the directory archive format so far */
861  if (archiveFormat != archDirectory && numWorkers > 1)
862  pg_fatal("parallel backup only supported by the directory format");
863 
864  /* Open the output file */
865  fout = CreateArchive(filename, archiveFormat, compression_spec,
866  dosync, archiveMode, setupDumpWorker, sync_method);
867 
868  /* Make dump options accessible right away */
869  SetArchiveOptions(fout, &dopt, NULL);
870 
871  /* Register the cleanup hook */
872  on_exit_close_archive(fout);
873 
874  /* Let the archiver know how noisy to be */
875  fout->verbose = g_verbose;
876 
877 
878  /*
879  * We allow the server to be back to 9.2, and up to any minor release of
880  * our own major version. (See also version check in pg_dumpall.c.)
881  */
882  fout->minRemoteVersion = 90200;
883  fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
884 
885  fout->numWorkers = numWorkers;
886 
887  /*
888  * Open the database using the Archiver, so it knows about it. Errors mean
889  * death.
890  */
891  ConnectDatabase(fout, &dopt.cparams, false);
892  setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
893 
894  /*
895  * On hot standbys, never try to dump unlogged table data, since it will
896  * just throw an error.
897  */
898  if (fout->isStandby)
899  dopt.no_unlogged_table_data = true;
900 
901  /*
902  * Find the last built-in OID, if needed (prior to 8.1)
903  *
904  * With 8.1 and above, we can just use FirstNormalObjectId - 1.
905  */
907 
908  pg_log_info("last built-in OID is %u", g_last_builtin_oid);
909 
910  /* Expand schema selection patterns into OID lists */
911  if (schema_include_patterns.head != NULL)
912  {
915  strict_names);
916  if (schema_include_oids.head == NULL)
917  pg_fatal("no matching schemas were found");
918  }
921  false);
922  /* non-matching exclusion patterns aren't an error */
923 
924  /* Expand table selection patterns into OID lists */
927  strict_names, false);
930  strict_names, true);
931  if ((table_include_patterns.head != NULL ||
933  table_include_oids.head == NULL)
934  pg_fatal("no matching tables were found");
935 
938  false, false);
941  false, true);
942 
945  false, false);
948  false, true);
949 
952 
953  /* non-matching exclusion patterns aren't an error */
954 
955  /* Expand extension selection patterns into OID lists */
956  if (extension_include_patterns.head != NULL)
957  {
960  strict_names);
961  if (extension_include_oids.head == NULL)
962  pg_fatal("no matching extensions were found");
963  }
966  false);
967  /* non-matching exclusion patterns aren't an error */
968 
969  /*
970  * Dumping LOs is the default for dumps where an inclusion switch is not
971  * used (an "include everything" dump). -B can be used to exclude LOs
972  * from those dumps. -b can be used to include LOs even when an inclusion
973  * switch is used.
974  *
975  * -s means "schema only" and LOs are data, not schema, so we never
976  * include LOs when -s is used.
977  */
978  if (dopt.include_everything && !dopt.schemaOnly && !dopt.dontOutputLOs)
979  dopt.outputLOs = true;
980 
981  /*
982  * Collect role names so we can map object owner OIDs to names.
983  */
984  collectRoleNames(fout);
985 
986  /*
987  * Now scan the database and create DumpableObject structs for all the
988  * objects we intend to dump.
989  */
990  tblinfo = getSchemaData(fout, &numTables);
991 
992  if (!dopt.schemaOnly)
993  {
994  getTableData(&dopt, tblinfo, numTables, 0);
996  if (dopt.dataOnly)
998  }
999 
1000  if (dopt.schemaOnly && dopt.sequence_data)
1001  getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
1002 
1003  /*
1004  * In binary-upgrade mode, we do not have to worry about the actual LO
1005  * data or the associated metadata that resides in the pg_largeobject and
1006  * pg_largeobject_metadata tables, respectively.
1007  *
1008  * However, we do need to collect LO information as there may be comments
1009  * or other information on LOs that we do need to dump out.
1010  */
1011  if (dopt.outputLOs || dopt.binary_upgrade)
1012  getLOs(fout);
1013 
1014  /*
1015  * Collect dependency data to assist in ordering the objects.
1016  */
1017  getDependencies(fout);
1018 
1019  /*
1020  * Collect ACLs, comments, and security labels, if wanted.
1021  */
1022  if (!dopt.aclsSkip)
1023  getAdditionalACLs(fout);
1024  if (!dopt.no_comments)
1025  collectComments(fout);
1026  if (!dopt.no_security_labels)
1027  collectSecLabels(fout);
1028 
1029  /* For binary upgrade mode, collect required pg_class information. */
1030  if (dopt.binary_upgrade)
1032 
1033  /* Collect sequence information. */
1034  collectSequences(fout);
1035 
1036  /* Lastly, create dummy objects to represent the section boundaries */
1037  boundaryObjs = createBoundaryObjects();
1038 
1039  /* Get pointers to all the known DumpableObjects */
1040  getDumpableObjects(&dobjs, &numObjs);
1041 
1042  /*
1043  * Add dummy dependencies to enforce the dump section ordering.
1044  */
1045  addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
1046 
1047  /*
1048  * Sort the objects into a safe dump order (no forward references).
1049  *
1050  * We rely on dependency information to help us determine a safe order, so
1051  * the initial sort is mostly for cosmetic purposes: we sort by name to
1052  * ensure that logically identical schemas will dump identically.
1053  */
1054  sortDumpableObjectsByTypeName(dobjs, numObjs);
1055 
1056  sortDumpableObjects(dobjs, numObjs,
1057  boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
1058 
1059  /*
1060  * Create archive TOC entries for all the objects to be dumped, in a safe
1061  * order.
1062  */
1063 
1064  /*
1065  * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
1066  */
1067  dumpEncoding(fout);
1068  dumpStdStrings(fout);
1069  dumpSearchPath(fout);
1070 
1071  /* The database items are always next, unless we don't want them at all */
1072  if (dopt.outputCreateDB)
1073  dumpDatabase(fout);
1074 
1075  /* Now the rearrangeable objects. */
1076  for (i = 0; i < numObjs; i++)
1077  dumpDumpableObject(fout, dobjs[i]);
1078 
1079  /*
1080  * Set up options info to ensure we dump what we want.
1081  */
1082  ropt = NewRestoreOptions();
1083  ropt->filename = filename;
1084 
1085  /* if you change this list, see dumpOptionsFromRestoreOptions */
1086  ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
1087  ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
1088  ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
1089  ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
1091  ropt->dropSchema = dopt.outputClean;
1092  ropt->dataOnly = dopt.dataOnly;
1093  ropt->schemaOnly = dopt.schemaOnly;
1094  ropt->if_exists = dopt.if_exists;
1095  ropt->column_inserts = dopt.column_inserts;
1096  ropt->dumpSections = dopt.dumpSections;
1097  ropt->aclsSkip = dopt.aclsSkip;
1098  ropt->superuser = dopt.outputSuperuser;
1099  ropt->createDB = dopt.outputCreateDB;
1100  ropt->noOwner = dopt.outputNoOwner;
1101  ropt->noTableAm = dopt.outputNoTableAm;
1102  ropt->noTablespace = dopt.outputNoTablespaces;
1103  ropt->disable_triggers = dopt.disable_triggers;
1104  ropt->use_setsessauth = dopt.use_setsessauth;
1106  ropt->dump_inserts = dopt.dump_inserts;
1107  ropt->no_comments = dopt.no_comments;
1108  ropt->no_publications = dopt.no_publications;
1110  ropt->no_subscriptions = dopt.no_subscriptions;
1111  ropt->lockWaitTimeout = dopt.lockWaitTimeout;
1114  ropt->sequence_data = dopt.sequence_data;
1115  ropt->binary_upgrade = dopt.binary_upgrade;
1116 
1117  ropt->compression_spec = compression_spec;
1118 
1119  ropt->suppressDumpWarnings = true; /* We've already shown them */
1120 
1121  SetArchiveOptions(fout, &dopt, ropt);
1122 
1123  /* Mark which entries should be output */
1125 
1126  /*
1127  * The archive's TOC entries are now marked as to which ones will actually
1128  * be output, so we can set up their dependency lists properly. This isn't
1129  * necessary for plain-text output, though.
1130  */
1131  if (!plainText)
1133 
1134  /*
1135  * And finally we can do the actual output.
1136  *
1137  * Note: for non-plain-text output formats, the output file is written
1138  * inside CloseArchive(). This is, um, bizarre; but not worth changing
1139  * right now.
1140  */
1141  if (plainText)
1142  RestoreArchive(fout);
1143 
1144  CloseArchive(fout);
1145 
1146  exit_nicely(0);
1147 }
1148 
1149 
1150 static void
1151 help(const char *progname)
1152 {
1153  printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1154  printf(_("Usage:\n"));
1155  printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1156 
1157  printf(_("\nGeneral options:\n"));
1158  printf(_(" -f, --file=FILENAME output file or directory name\n"));
1159  printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1160  " plain text (default))\n"));
1161  printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1162  printf(_(" -v, --verbose verbose mode\n"));
1163  printf(_(" -V, --version output version information, then exit\n"));
1164  printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
1165  " compress as specified\n"));
1166  printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1167  printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1168  printf(_(" --sync-method=METHOD set method for syncing files to disk\n"));
1169  printf(_(" -?, --help show this help, then exit\n"));
1170 
1171  printf(_("\nOptions controlling the output content:\n"));
1172  printf(_(" -a, --data-only dump only the data, not the schema\n"));
1173  printf(_(" -b, --large-objects include large objects in dump\n"));
1174  printf(_(" --blobs (same as --large-objects, deprecated)\n"));
1175  printf(_(" -B, --no-large-objects exclude large objects in dump\n"));
1176  printf(_(" --no-blobs (same as --no-large-objects, deprecated)\n"));
1177  printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1178  printf(_(" -C, --create include commands to create database in dump\n"));
1179  printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1180  printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1181  printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1182  printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1183  printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1184  " plain-text format\n"));
1185  printf(_(" -s, --schema-only dump only the schema, no data\n"));
1186  printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1187  printf(_(" -t, --table=PATTERN dump only the specified table(s)\n"));
1188  printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1189  printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1190  printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1191  printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1192  printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1193  printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1194  printf(_(" --enable-row-security enable row security (dump only content user has\n"
1195  " access to)\n"));
1196  printf(_(" --exclude-extension=PATTERN do NOT dump the specified extension(s)\n"));
1197  printf(_(" --exclude-table-and-children=PATTERN\n"
1198  " do NOT dump the specified table(s), including\n"
1199  " child and partition tables\n"));
1200  printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1201  printf(_(" --exclude-table-data-and-children=PATTERN\n"
1202  " do NOT dump data for the specified table(s),\n"
1203  " including child and partition tables\n"));
1204  printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1205  printf(_(" --filter=FILENAME include or exclude objects and data from dump\n"
1206  " based on expressions in FILENAME\n"));
1207  printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1208  printf(_(" --include-foreign-data=PATTERN\n"
1209  " include data of foreign tables on foreign\n"
1210  " servers matching PATTERN\n"));
1211  printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1212  printf(_(" --load-via-partition-root load partitions via the root table\n"));
1213  printf(_(" --no-comments do not dump comments\n"));
1214  printf(_(" --no-publications do not dump publications\n"));
1215  printf(_(" --no-security-labels do not dump security label assignments\n"));
1216  printf(_(" --no-subscriptions do not dump subscriptions\n"));
1217  printf(_(" --no-table-access-method do not dump table access methods\n"));
1218  printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1219  printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1220  printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1221  printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1222  printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1223  printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1224  printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1225  printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1226  printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1227  printf(_(" --strict-names require table and/or schema include patterns to\n"
1228  " match at least one entity each\n"));
1229  printf(_(" --table-and-children=PATTERN dump only the specified table(s), including\n"
1230  " child and partition tables\n"));
1231  printf(_(" --use-set-session-authorization\n"
1232  " use SET SESSION AUTHORIZATION commands instead of\n"
1233  " ALTER OWNER commands to set ownership\n"));
1234 
1235  printf(_("\nConnection options:\n"));
1236  printf(_(" -d, --dbname=DBNAME database to dump\n"));
1237  printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1238  printf(_(" -p, --port=PORT database server port number\n"));
1239  printf(_(" -U, --username=NAME connect as specified database user\n"));
1240  printf(_(" -w, --no-password never prompt for password\n"));
1241  printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1242  printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1243 
1244  printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1245  "variable value is used.\n\n"));
1246  printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1247  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1248 }
1249 
1250 static void
1251 setup_connection(Archive *AH, const char *dumpencoding,
1252  const char *dumpsnapshot, char *use_role)
1253 {
1254  DumpOptions *dopt = AH->dopt;
1255  PGconn *conn = GetConnection(AH);
1256  const char *std_strings;
1257 
1259 
1260  /*
1261  * Set the client encoding if requested.
1262  */
1263  if (dumpencoding)
1264  {
1265  if (PQsetClientEncoding(conn, dumpencoding) < 0)
1266  pg_fatal("invalid client encoding \"%s\" specified",
1267  dumpencoding);
1268  }
1269 
1270  /*
1271  * Get the active encoding and the standard_conforming_strings setting, so
1272  * we know how to escape strings.
1273  */
1275 
1276  std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1277  AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1278 
1279  /*
1280  * Set the role if requested. In a parallel dump worker, we'll be passed
1281  * use_role == NULL, but AH->use_role is already set (if user specified it
1282  * originally) and we should use that.
1283  */
1284  if (!use_role && AH->use_role)
1285  use_role = AH->use_role;
1286 
1287  /* Set the role if requested */
1288  if (use_role)
1289  {
1290  PQExpBuffer query = createPQExpBuffer();
1291 
1292  appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1293  ExecuteSqlStatement(AH, query->data);
1294  destroyPQExpBuffer(query);
1295 
1296  /* save it for possible later use by parallel workers */
1297  if (!AH->use_role)
1298  AH->use_role = pg_strdup(use_role);
1299  }
1300 
1301  /* Set the datestyle to ISO to ensure the dump's portability */
1302  ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1303 
1304  /* Likewise, avoid using sql_standard intervalstyle */
1305  ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1306 
1307  /*
1308  * Use an explicitly specified extra_float_digits if it has been provided.
1309  * Otherwise, set extra_float_digits so that we can dump float data
1310  * exactly (given correctly implemented float I/O code, anyway).
1311  */
1313  {
1315 
1316  appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1318  ExecuteSqlStatement(AH, q->data);
1319  destroyPQExpBuffer(q);
1320  }
1321  else
1322  ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1323 
1324  /*
1325  * Disable synchronized scanning, to prevent unpredictable changes in row
1326  * ordering across a dump and reload.
1327  */
1328  ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1329 
1330  /*
1331  * Disable timeouts if supported.
1332  */
1333  ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1334  if (AH->remoteVersion >= 90300)
1335  ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1336  if (AH->remoteVersion >= 90600)
1337  ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1338  if (AH->remoteVersion >= 170000)
1339  ExecuteSqlStatement(AH, "SET transaction_timeout = 0");
1340 
1341  /*
1342  * Quote all identifiers, if requested.
1343  */
1345  ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1346 
1347  /*
1348  * Adjust row-security mode, if supported.
1349  */
1350  if (AH->remoteVersion >= 90500)
1351  {
1352  if (dopt->enable_row_security)
1353  ExecuteSqlStatement(AH, "SET row_security = on");
1354  else
1355  ExecuteSqlStatement(AH, "SET row_security = off");
1356  }
1357 
1358  /*
1359  * For security reasons, we restrict the expansion of non-system views and
1360  * access to foreign tables during the pg_dump process. This restriction
1361  * is adjusted when dumping foreign table data.
1362  */
1363  set_restrict_relation_kind(AH, "view, foreign-table");
1364 
1365  /*
1366  * Initialize prepared-query state to "nothing prepared". We do this here
1367  * so that a parallel dump worker will have its own state.
1368  */
1369  AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1370 
1371  /*
1372  * Start transaction-snapshot mode transaction to dump consistent data.
1373  */
1374  ExecuteSqlStatement(AH, "BEGIN");
1375 
1376  /*
1377  * To support the combination of serializable_deferrable with the jobs
1378  * option we use REPEATABLE READ for the worker connections that are
1379  * passed a snapshot. As long as the snapshot is acquired in a
1380  * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1381  * REPEATABLE READ transaction provides the appropriate integrity
1382  * guarantees. This is a kluge, but safe for back-patching.
1383  */
1384  if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1386  "SET TRANSACTION ISOLATION LEVEL "
1387  "SERIALIZABLE, READ ONLY, DEFERRABLE");
1388  else
1390  "SET TRANSACTION ISOLATION LEVEL "
1391  "REPEATABLE READ, READ ONLY");
1392 
1393  /*
1394  * If user specified a snapshot to use, select that. In a parallel dump
1395  * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1396  * is already set (if the server can handle it) and we should use that.
1397  */
1398  if (dumpsnapshot)
1399  AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1400 
1401  if (AH->sync_snapshot_id)
1402  {
1403  PQExpBuffer query = createPQExpBuffer();
1404 
1405  appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1407  ExecuteSqlStatement(AH, query->data);
1408  destroyPQExpBuffer(query);
1409  }
1410  else if (AH->numWorkers > 1)
1411  {
1412  if (AH->isStandby && AH->remoteVersion < 100000)
1413  pg_fatal("parallel dumps from standby servers are not supported by this server version");
1415  }
1416 }
1417 
1418 /* Set up connection for a parallel worker process */
1419 static void
1421 {
1422  /*
1423  * We want to re-select all the same values the leader connection is
1424  * using. We'll have inherited directly-usable values in
1425  * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1426  * inherited encoding value back to a string to pass to setup_connection.
1427  */
1428  setup_connection(AH,
1430  NULL,
1431  NULL);
1432 }
1433 
1434 static char *
1436 {
1437  char *query = "SELECT pg_catalog.pg_export_snapshot()";
1438  char *result;
1439  PGresult *res;
1440 
1441  res = ExecuteSqlQueryForSingleRow(fout, query);
1442  result = pg_strdup(PQgetvalue(res, 0, 0));
1443  PQclear(res);
1444 
1445  return result;
1446 }
1447 
1448 static ArchiveFormat
1450 {
1451  ArchiveFormat archiveFormat;
1452 
1453  *mode = archModeWrite;
1454 
1455  if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1456  {
1457  /* This is used by pg_dumpall, and is not documented */
1458  archiveFormat = archNull;
1459  *mode = archModeAppend;
1460  }
1461  else if (pg_strcasecmp(format, "c") == 0)
1462  archiveFormat = archCustom;
1463  else if (pg_strcasecmp(format, "custom") == 0)
1464  archiveFormat = archCustom;
1465  else if (pg_strcasecmp(format, "d") == 0)
1466  archiveFormat = archDirectory;
1467  else if (pg_strcasecmp(format, "directory") == 0)
1468  archiveFormat = archDirectory;
1469  else if (pg_strcasecmp(format, "p") == 0)
1470  archiveFormat = archNull;
1471  else if (pg_strcasecmp(format, "plain") == 0)
1472  archiveFormat = archNull;
1473  else if (pg_strcasecmp(format, "t") == 0)
1474  archiveFormat = archTar;
1475  else if (pg_strcasecmp(format, "tar") == 0)
1476  archiveFormat = archTar;
1477  else
1478  pg_fatal("invalid output format \"%s\" specified", format);
1479  return archiveFormat;
1480 }
1481 
1482 /*
1483  * Find the OIDs of all schemas matching the given list of patterns,
1484  * and append them to the given OID list.
1485  */
1486 static void
1488  SimpleStringList *patterns,
1489  SimpleOidList *oids,
1490  bool strict_names)
1491 {
1492  PQExpBuffer query;
1493  PGresult *res;
1494  SimpleStringListCell *cell;
1495  int i;
1496 
1497  if (patterns->head == NULL)
1498  return; /* nothing to do */
1499 
1500  query = createPQExpBuffer();
1501 
1502  /*
1503  * The loop below runs multiple SELECTs might sometimes result in
1504  * duplicate entries in the OID list, but we don't care.
1505  */
1506 
1507  for (cell = patterns->head; cell; cell = cell->next)
1508  {
1509  PQExpBufferData dbbuf;
1510  int dotcnt;
1511 
1512  appendPQExpBufferStr(query,
1513  "SELECT oid FROM pg_catalog.pg_namespace n\n");
1514  initPQExpBuffer(&dbbuf);
1515  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1516  false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1517  &dotcnt);
1518  if (dotcnt > 1)
1519  pg_fatal("improper qualified name (too many dotted names): %s",
1520  cell->val);
1521  else if (dotcnt == 1)
1522  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1523  termPQExpBuffer(&dbbuf);
1524 
1525  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1526  if (strict_names && PQntuples(res) == 0)
1527  pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1528 
1529  for (i = 0; i < PQntuples(res); i++)
1530  {
1532  }
1533 
1534  PQclear(res);
1535  resetPQExpBuffer(query);
1536  }
1537 
1538  destroyPQExpBuffer(query);
1539 }
1540 
1541 /*
1542  * Find the OIDs of all extensions matching the given list of patterns,
1543  * and append them to the given OID list.
1544  */
1545 static void
1547  SimpleStringList *patterns,
1548  SimpleOidList *oids,
1549  bool strict_names)
1550 {
1551  PQExpBuffer query;
1552  PGresult *res;
1553  SimpleStringListCell *cell;
1554  int i;
1555 
1556  if (patterns->head == NULL)
1557  return; /* nothing to do */
1558 
1559  query = createPQExpBuffer();
1560 
1561  /*
1562  * The loop below runs multiple SELECTs might sometimes result in
1563  * duplicate entries in the OID list, but we don't care.
1564  */
1565  for (cell = patterns->head; cell; cell = cell->next)
1566  {
1567  int dotcnt;
1568 
1569  appendPQExpBufferStr(query,
1570  "SELECT oid FROM pg_catalog.pg_extension e\n");
1571  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1572  false, NULL, "e.extname", NULL, NULL, NULL,
1573  &dotcnt);
1574  if (dotcnt > 0)
1575  pg_fatal("improper qualified name (too many dotted names): %s",
1576  cell->val);
1577 
1578  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1579  if (strict_names && PQntuples(res) == 0)
1580  pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1581 
1582  for (i = 0; i < PQntuples(res); i++)
1583  {
1585  }
1586 
1587  PQclear(res);
1588  resetPQExpBuffer(query);
1589  }
1590 
1591  destroyPQExpBuffer(query);
1592 }
1593 
1594 /*
1595  * Find the OIDs of all foreign servers matching the given list of patterns,
1596  * and append them to the given OID list.
1597  */
1598 static void
1600  SimpleStringList *patterns,
1601  SimpleOidList *oids)
1602 {
1603  PQExpBuffer query;
1604  PGresult *res;
1605  SimpleStringListCell *cell;
1606  int i;
1607 
1608  if (patterns->head == NULL)
1609  return; /* nothing to do */
1610 
1611  query = createPQExpBuffer();
1612 
1613  /*
1614  * The loop below runs multiple SELECTs might sometimes result in
1615  * duplicate entries in the OID list, but we don't care.
1616  */
1617 
1618  for (cell = patterns->head; cell; cell = cell->next)
1619  {
1620  int dotcnt;
1621 
1622  appendPQExpBufferStr(query,
1623  "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1624  processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1625  false, NULL, "s.srvname", NULL, NULL, NULL,
1626  &dotcnt);
1627  if (dotcnt > 0)
1628  pg_fatal("improper qualified name (too many dotted names): %s",
1629  cell->val);
1630 
1631  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1632  if (PQntuples(res) == 0)
1633  pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1634 
1635  for (i = 0; i < PQntuples(res); i++)
1637 
1638  PQclear(res);
1639  resetPQExpBuffer(query);
1640  }
1641 
1642  destroyPQExpBuffer(query);
1643 }
1644 
1645 /*
1646  * Find the OIDs of all tables matching the given list of patterns,
1647  * and append them to the given OID list. See also expand_dbname_patterns()
1648  * in pg_dumpall.c
1649  */
1650 static void
1652  SimpleStringList *patterns, SimpleOidList *oids,
1653  bool strict_names, bool with_child_tables)
1654 {
1655  PQExpBuffer query;
1656  PGresult *res;
1657  SimpleStringListCell *cell;
1658  int i;
1659 
1660  if (patterns->head == NULL)
1661  return; /* nothing to do */
1662 
1663  query = createPQExpBuffer();
1664 
1665  /*
1666  * this might sometimes result in duplicate entries in the OID list, but
1667  * we don't care.
1668  */
1669 
1670  for (cell = patterns->head; cell; cell = cell->next)
1671  {
1672  PQExpBufferData dbbuf;
1673  int dotcnt;
1674 
1675  /*
1676  * Query must remain ABSOLUTELY devoid of unqualified names. This
1677  * would be unnecessary given a pg_table_is_visible() variant taking a
1678  * search_path argument.
1679  *
1680  * For with_child_tables, we start with the basic query's results and
1681  * recursively search the inheritance tree to add child tables.
1682  */
1683  if (with_child_tables)
1684  {
1685  appendPQExpBuffer(query, "WITH RECURSIVE partition_tree (relid) AS (\n");
1686  }
1687 
1688  appendPQExpBuffer(query,
1689  "SELECT c.oid"
1690  "\nFROM pg_catalog.pg_class c"
1691  "\n LEFT JOIN pg_catalog.pg_namespace n"
1692  "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1693  "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1694  "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1695  RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1696  RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1697  RELKIND_PARTITIONED_TABLE);
1698  initPQExpBuffer(&dbbuf);
1699  processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1700  false, "n.nspname", "c.relname", NULL,
1701  "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1702  &dotcnt);
1703  if (dotcnt > 2)
1704  pg_fatal("improper relation name (too many dotted names): %s",
1705  cell->val);
1706  else if (dotcnt == 2)
1707  prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1708  termPQExpBuffer(&dbbuf);
1709 
1710  if (with_child_tables)
1711  {
1712  appendPQExpBuffer(query, "UNION"
1713  "\nSELECT i.inhrelid"
1714  "\nFROM partition_tree p"
1715  "\n JOIN pg_catalog.pg_inherits i"
1716  "\n ON p.relid OPERATOR(pg_catalog.=) i.inhparent"
1717  "\n)"
1718  "\nSELECT relid FROM partition_tree");
1719  }
1720 
1721  ExecuteSqlStatement(fout, "RESET search_path");
1722  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1725  if (strict_names && PQntuples(res) == 0)
1726  pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1727 
1728  for (i = 0; i < PQntuples(res); i++)
1729  {
1731  }
1732 
1733  PQclear(res);
1734  resetPQExpBuffer(query);
1735  }
1736 
1737  destroyPQExpBuffer(query);
1738 }
1739 
1740 /*
1741  * Verifies that the connected database name matches the given database name,
1742  * and if not, dies with an error about the given pattern.
1743  *
1744  * The 'dbname' argument should be a literal name parsed from 'pattern'.
1745  */
1746 static void
1747 prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1748 {
1749  const char *db;
1750 
1751  db = PQdb(conn);
1752  if (db == NULL)
1753  pg_fatal("You are currently not connected to a database.");
1754 
1755  if (strcmp(db, dbname) != 0)
1756  pg_fatal("cross-database references are not implemented: %s",
1757  pattern);
1758 }
1759 
1760 /*
1761  * checkExtensionMembership
1762  * Determine whether object is an extension member, and if so,
1763  * record an appropriate dependency and set the object's dump flag.
1764  *
1765  * It's important to call this for each object that could be an extension
1766  * member. Generally, we integrate this with determining the object's
1767  * to-be-dumped-ness, since extension membership overrides other rules for that.
1768  *
1769  * Returns true if object is an extension member, else false.
1770  */
1771 static bool
1773 {
1774  ExtensionInfo *ext = findOwningExtension(dobj->catId);
1775 
1776  if (ext == NULL)
1777  return false;
1778 
1779  dobj->ext_member = true;
1780 
1781  /* Record dependency so that getDependencies needn't deal with that */
1782  addObjectDependency(dobj, ext->dobj.dumpId);
1783 
1784  /*
1785  * In 9.6 and above, mark the member object to have any non-initial ACLs
1786  * dumped. (Any initial ACLs will be removed later, using data from
1787  * pg_init_privs, so that we'll dump only the delta from the extension's
1788  * initial setup.)
1789  *
1790  * Prior to 9.6, we do not include any extension member components.
1791  *
1792  * In binary upgrades, we still dump all components of the members
1793  * individually, since the idea is to exactly reproduce the database
1794  * contents rather than replace the extension contents with something
1795  * different.
1796  *
1797  * Note: it might be interesting someday to implement storage and delta
1798  * dumping of extension members' RLS policies and/or security labels.
1799  * However there is a pitfall for RLS policies: trying to dump them
1800  * requires getting a lock on their tables, and the calling user might not
1801  * have privileges for that. We need no lock to examine a table's ACLs,
1802  * so the current feature doesn't have a problem of that sort.
1803  */
1804  if (fout->dopt->binary_upgrade)
1805  dobj->dump = ext->dobj.dump;
1806  else
1807  {
1808  if (fout->remoteVersion < 90600)
1809  dobj->dump = DUMP_COMPONENT_NONE;
1810  else
1811  dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL);
1812  }
1813 
1814  return true;
1815 }
1816 
1817 /*
1818  * selectDumpableNamespace: policy-setting subroutine
1819  * Mark a namespace as to be dumped or not
1820  */
1821 static void
1823 {
1824  /*
1825  * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1826  * and (for --clean) a DROP SCHEMA statement. (In the absence of
1827  * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1828  */
1829  nsinfo->create = true;
1830 
1831  /*
1832  * If specific tables are being dumped, do not dump any complete
1833  * namespaces. If specific namespaces are being dumped, dump just those
1834  * namespaces. Otherwise, dump all non-system namespaces.
1835  */
1836  if (table_include_oids.head != NULL)
1837  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1838  else if (schema_include_oids.head != NULL)
1839  nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1841  nsinfo->dobj.catId.oid) ?
1843  else if (fout->remoteVersion >= 90600 &&
1844  strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1845  {
1846  /*
1847  * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1848  * they are interesting (and not the original ACLs which were set at
1849  * initdb time, see pg_init_privs).
1850  */
1851  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1852  }
1853  else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1854  strcmp(nsinfo->dobj.name, "information_schema") == 0)
1855  {
1856  /* Other system schemas don't get dumped */
1857  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1858  }
1859  else if (strcmp(nsinfo->dobj.name, "public") == 0)
1860  {
1861  /*
1862  * The public schema is a strange beast that sits in a sort of
1863  * no-mans-land between being a system object and a user object.
1864  * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1865  * a comment and an indication of ownership. If the owner is the
1866  * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1867  * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1868  */
1869  nsinfo->create = false;
1870  nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1871  if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1872  nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1874 
1875  /*
1876  * Also, make like it has a comment even if it doesn't; this is so
1877  * that we'll emit a command to drop the comment, if appropriate.
1878  * (Without this, we'd not call dumpCommentExtended for it.)
1879  */
1881  }
1882  else
1883  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1884 
1885  /*
1886  * In any case, a namespace can be excluded by an exclusion switch
1887  */
1888  if (nsinfo->dobj.dump_contains &&
1890  nsinfo->dobj.catId.oid))
1891  nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1892 
1893  /*
1894  * If the schema belongs to an extension, allow extension membership to
1895  * override the dump decision for the schema itself. However, this does
1896  * not change dump_contains, so this won't change what we do with objects
1897  * within the schema. (If they belong to the extension, they'll get
1898  * suppressed by it, otherwise not.)
1899  */
1900  (void) checkExtensionMembership(&nsinfo->dobj, fout);
1901 }
1902 
1903 /*
1904  * selectDumpableTable: policy-setting subroutine
1905  * Mark a table as to be dumped or not
1906  */
1907 static void
1909 {
1910  if (checkExtensionMembership(&tbinfo->dobj, fout))
1911  return; /* extension membership overrides all else */
1912 
1913  /*
1914  * If specific tables are being dumped, dump just those tables; else, dump
1915  * according to the parent namespace's dump flag.
1916  */
1917  if (table_include_oids.head != NULL)
1919  tbinfo->dobj.catId.oid) ?
1921  else
1922  tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1923 
1924  /*
1925  * In any case, a table can be excluded by an exclusion switch
1926  */
1927  if (tbinfo->dobj.dump &&
1929  tbinfo->dobj.catId.oid))
1930  tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1931 }
1932 
1933 /*
1934  * selectDumpableType: policy-setting subroutine
1935  * Mark a type as to be dumped or not
1936  *
1937  * If it's a table's rowtype or an autogenerated array type, we also apply a
1938  * special type code to facilitate sorting into the desired order. (We don't
1939  * want to consider those to be ordinary types because that would bring tables
1940  * up into the datatype part of the dump order.) We still set the object's
1941  * dump flag; that's not going to cause the dummy type to be dumped, but we
1942  * need it so that casts involving such types will be dumped correctly -- see
1943  * dumpCast. This means the flag should be set the same as for the underlying
1944  * object (the table or base type).
1945  */
1946 static void
1948 {
1949  /* skip complex types, except for standalone composite types */
1950  if (OidIsValid(tyinfo->typrelid) &&
1951  tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1952  {
1953  TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1954 
1955  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1956  if (tytable != NULL)
1957  tyinfo->dobj.dump = tytable->dobj.dump;
1958  else
1959  tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1960  return;
1961  }
1962 
1963  /* skip auto-generated array and multirange types */
1964  if (tyinfo->isArray || tyinfo->isMultirange)
1965  {
1966  tyinfo->dobj.objType = DO_DUMMY_TYPE;
1967 
1968  /*
1969  * Fall through to set the dump flag; we assume that the subsequent
1970  * rules will do the same thing as they would for the array's base
1971  * type or multirange's range type. (We cannot reliably look up the
1972  * base type here, since getTypes may not have processed it yet.)
1973  */
1974  }
1975 
1976  if (checkExtensionMembership(&tyinfo->dobj, fout))
1977  return; /* extension membership overrides all else */
1978 
1979  /* Dump based on if the contents of the namespace are being dumped */
1980  tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1981 }
1982 
1983 /*
1984  * selectDumpableDefaultACL: policy-setting subroutine
1985  * Mark a default ACL as to be dumped or not
1986  *
1987  * For per-schema default ACLs, dump if the schema is to be dumped.
1988  * Otherwise dump if we are dumping "everything". Note that dataOnly
1989  * and aclsSkip are checked separately.
1990  */
1991 static void
1993 {
1994  /* Default ACLs can't be extension members */
1995 
1996  if (dinfo->dobj.namespace)
1997  /* default ACLs are considered part of the namespace */
1998  dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
1999  else
2000  dinfo->dobj.dump = dopt->include_everything ?
2002 }
2003 
2004 /*
2005  * selectDumpableCast: policy-setting subroutine
2006  * Mark a cast as to be dumped or not
2007  *
2008  * Casts do not belong to any particular namespace (since they haven't got
2009  * names), nor do they have identifiable owners. To distinguish user-defined
2010  * casts from built-in ones, we must resort to checking whether the cast's
2011  * OID is in the range reserved for initdb.
2012  */
2013 static void
2015 {
2016  if (checkExtensionMembership(&cast->dobj, fout))
2017  return; /* extension membership overrides all else */
2018 
2019  /*
2020  * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
2021  * support ACLs currently.
2022  */
2023  if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2024  cast->dobj.dump = DUMP_COMPONENT_NONE;
2025  else
2026  cast->dobj.dump = fout->dopt->include_everything ?
2028 }
2029 
2030 /*
2031  * selectDumpableProcLang: policy-setting subroutine
2032  * Mark a procedural language as to be dumped or not
2033  *
2034  * Procedural languages do not belong to any particular namespace. To
2035  * identify built-in languages, we must resort to checking whether the
2036  * language's OID is in the range reserved for initdb.
2037  */
2038 static void
2040 {
2041  if (checkExtensionMembership(&plang->dobj, fout))
2042  return; /* extension membership overrides all else */
2043 
2044  /*
2045  * Only include procedural languages when we are dumping everything.
2046  *
2047  * For from-initdb procedural languages, only include ACLs, as we do for
2048  * the pg_catalog namespace. We need this because procedural languages do
2049  * not live in any namespace.
2050  */
2051  if (!fout->dopt->include_everything)
2052  plang->dobj.dump = DUMP_COMPONENT_NONE;
2053  else
2054  {
2055  if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2056  plang->dobj.dump = fout->remoteVersion < 90600 ?
2058  else
2059  plang->dobj.dump = DUMP_COMPONENT_ALL;
2060  }
2061 }
2062 
2063 /*
2064  * selectDumpableAccessMethod: policy-setting subroutine
2065  * Mark an access method as to be dumped or not
2066  *
2067  * Access methods do not belong to any particular namespace. To identify
2068  * built-in access methods, we must resort to checking whether the
2069  * method's OID is in the range reserved for initdb.
2070  */
2071 static void
2073 {
2074  if (checkExtensionMembership(&method->dobj, fout))
2075  return; /* extension membership overrides all else */
2076 
2077  /*
2078  * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
2079  * they do not support ACLs currently.
2080  */
2081  if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2082  method->dobj.dump = DUMP_COMPONENT_NONE;
2083  else
2084  method->dobj.dump = fout->dopt->include_everything ?
2086 }
2087 
2088 /*
2089  * selectDumpableExtension: policy-setting subroutine
2090  * Mark an extension as to be dumped or not
2091  *
2092  * Built-in extensions should be skipped except for checking ACLs, since we
2093  * assume those will already be installed in the target database. We identify
2094  * such extensions by their having OIDs in the range reserved for initdb.
2095  * We dump all user-added extensions by default. No extensions are dumped
2096  * if include_everything is false (i.e., a --schema or --table switch was
2097  * given), except if --extension specifies a list of extensions to dump.
2098  */
2099 static void
2101 {
2102  /*
2103  * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
2104  * change permissions on their member objects, if they wish to, and have
2105  * those changes preserved.
2106  */
2107  if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2108  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
2109  else
2110  {
2111  /* check if there is a list of extensions to dump */
2112  if (extension_include_oids.head != NULL)
2113  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2115  extinfo->dobj.catId.oid) ?
2117  else
2118  extinfo->dobj.dump = extinfo->dobj.dump_contains =
2119  dopt->include_everything ?
2121 
2122  /* check that the extension is not explicitly excluded */
2123  if (extinfo->dobj.dump &&
2125  extinfo->dobj.catId.oid))
2126  extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_NONE;
2127  }
2128 }
2129 
2130 /*
2131  * selectDumpablePublicationObject: policy-setting subroutine
2132  * Mark a publication object as to be dumped or not
2133  *
2134  * A publication can have schemas and tables which have schemas, but those are
2135  * ignored in decision making, because publications are only dumped when we are
2136  * dumping everything.
2137  */
2138 static void
2140 {
2141  if (checkExtensionMembership(dobj, fout))
2142  return; /* extension membership overrides all else */
2143 
2144  dobj->dump = fout->dopt->include_everything ?
2146 }
2147 
2148 /*
2149  * selectDumpableStatisticsObject: policy-setting subroutine
2150  * Mark an extended statistics object as to be dumped or not
2151  *
2152  * We dump an extended statistics object if the schema it's in and the table
2153  * it's for are being dumped. (This'll need more thought if statistics
2154  * objects ever support cross-table stats.)
2155  */
2156 static void
2158 {
2159  if (checkExtensionMembership(&sobj->dobj, fout))
2160  return; /* extension membership overrides all else */
2161 
2162  sobj->dobj.dump = sobj->dobj.namespace->dobj.dump_contains;
2163  if (sobj->stattable == NULL ||
2165  sobj->dobj.dump = DUMP_COMPONENT_NONE;
2166 }
2167 
2168 /*
2169  * selectDumpableObject: policy-setting subroutine
2170  * Mark a generic dumpable object as to be dumped or not
2171  *
2172  * Use this only for object types without a special-case routine above.
2173  */
2174 static void
2176 {
2177  if (checkExtensionMembership(dobj, fout))
2178  return; /* extension membership overrides all else */
2179 
2180  /*
2181  * Default policy is to dump if parent namespace is dumpable, or for
2182  * non-namespace-associated items, dump if we're dumping "everything".
2183  */
2184  if (dobj->namespace)
2185  dobj->dump = dobj->namespace->dobj.dump_contains;
2186  else
2187  dobj->dump = fout->dopt->include_everything ?
2189 }
2190 
2191 /*
2192  * Dump a table's contents for loading using the COPY command
2193  * - this routine is called by the Archiver when it wants the table
2194  * to be dumped.
2195  */
2196 static int
2197 dumpTableData_copy(Archive *fout, const void *dcontext)
2198 {
2199  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2200  TableInfo *tbinfo = tdinfo->tdtable;
2201  const char *classname = tbinfo->dobj.name;
2203 
2204  /*
2205  * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2206  * which uses it already.
2207  */
2208  PQExpBuffer clistBuf = createPQExpBuffer();
2209  PGconn *conn = GetConnection(fout);
2210  PGresult *res;
2211  int ret;
2212  char *copybuf;
2213  const char *column_list;
2214 
2215  pg_log_info("dumping contents of table \"%s.%s\"",
2216  tbinfo->dobj.namespace->dobj.name, classname);
2217 
2218  /*
2219  * Specify the column list explicitly so that we have no possibility of
2220  * retrieving data in the wrong column order. (The default column
2221  * ordering of COPY will not be what we want in certain corner cases
2222  * involving ADD COLUMN and inheritance.)
2223  */
2224  column_list = fmtCopyColumnList(tbinfo, clistBuf);
2225 
2226  /*
2227  * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2228  * a filter condition was specified. For other cases a simple COPY
2229  * suffices.
2230  */
2231  if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2232  {
2233  /* Temporary allows to access to foreign tables to dump data */
2234  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2235  set_restrict_relation_kind(fout, "view");
2236 
2237  appendPQExpBufferStr(q, "COPY (SELECT ");
2238  /* klugery to get rid of parens in column list */
2239  if (strlen(column_list) > 2)
2240  {
2241  appendPQExpBufferStr(q, column_list + 1);
2242  q->data[q->len - 1] = ' ';
2243  }
2244  else
2245  appendPQExpBufferStr(q, "* ");
2246 
2247  appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2248  fmtQualifiedDumpable(tbinfo),
2249  tdinfo->filtercond ? tdinfo->filtercond : "");
2250  }
2251  else
2252  {
2253  appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2254  fmtQualifiedDumpable(tbinfo),
2255  column_list);
2256  }
2257  res = ExecuteSqlQuery(fout, q->data, PGRES_COPY_OUT);
2258  PQclear(res);
2259  destroyPQExpBuffer(clistBuf);
2260 
2261  for (;;)
2262  {
2263  ret = PQgetCopyData(conn, &copybuf, 0);
2264 
2265  if (ret < 0)
2266  break; /* done or error */
2267 
2268  if (copybuf)
2269  {
2270  WriteData(fout, copybuf, ret);
2271  PQfreemem(copybuf);
2272  }
2273 
2274  /* ----------
2275  * THROTTLE:
2276  *
2277  * There was considerable discussion in late July, 2000 regarding
2278  * slowing down pg_dump when backing up large tables. Users with both
2279  * slow & fast (multi-processor) machines experienced performance
2280  * degradation when doing a backup.
2281  *
2282  * Initial attempts based on sleeping for a number of ms for each ms
2283  * of work were deemed too complex, then a simple 'sleep in each loop'
2284  * implementation was suggested. The latter failed because the loop
2285  * was too tight. Finally, the following was implemented:
2286  *
2287  * If throttle is non-zero, then
2288  * See how long since the last sleep.
2289  * Work out how long to sleep (based on ratio).
2290  * If sleep is more than 100ms, then
2291  * sleep
2292  * reset timer
2293  * EndIf
2294  * EndIf
2295  *
2296  * where the throttle value was the number of ms to sleep per ms of
2297  * work. The calculation was done in each loop.
2298  *
2299  * Most of the hard work is done in the backend, and this solution
2300  * still did not work particularly well: on slow machines, the ratio
2301  * was 50:1, and on medium paced machines, 1:1, and on fast
2302  * multi-processor machines, it had little or no effect, for reasons
2303  * that were unclear.
2304  *
2305  * Further discussion ensued, and the proposal was dropped.
2306  *
2307  * For those people who want this feature, it can be implemented using
2308  * gettimeofday in each loop, calculating the time since last sleep,
2309  * multiplying that by the sleep ratio, then if the result is more
2310  * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2311  * function to sleep for a subsecond period ie.
2312  *
2313  * select(0, NULL, NULL, NULL, &tvi);
2314  *
2315  * This will return after the interval specified in the structure tvi.
2316  * Finally, call gettimeofday again to save the 'last sleep time'.
2317  * ----------
2318  */
2319  }
2320  archprintf(fout, "\\.\n\n\n");
2321 
2322  if (ret == -2)
2323  {
2324  /* copy data transfer failed */
2325  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2326  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2327  pg_log_error_detail("Command was: %s", q->data);
2328  exit_nicely(1);
2329  }
2330 
2331  /* Check command status and return to normal libpq state */
2332  res = PQgetResult(conn);
2334  {
2335  pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2336  pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2337  pg_log_error_detail("Command was: %s", q->data);
2338  exit_nicely(1);
2339  }
2340  PQclear(res);
2341 
2342  /* Do this to ensure we've pumped libpq back to idle state */
2343  if (PQgetResult(conn) != NULL)
2344  pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2345  classname);
2346 
2347  destroyPQExpBuffer(q);
2348 
2349  /* Revert back the setting */
2350  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2351  set_restrict_relation_kind(fout, "view, foreign-table");
2352 
2353  return 1;
2354 }
2355 
2356 /*
2357  * Dump table data using INSERT commands.
2358  *
2359  * Caution: when we restore from an archive file direct to database, the
2360  * INSERT commands emitted by this function have to be parsed by
2361  * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2362  * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2363  */
2364 static int
2365 dumpTableData_insert(Archive *fout, const void *dcontext)
2366 {
2367  TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2368  TableInfo *tbinfo = tdinfo->tdtable;
2369  DumpOptions *dopt = fout->dopt;
2371  PQExpBuffer insertStmt = NULL;
2372  char *attgenerated;
2373  PGresult *res;
2374  int nfields,
2375  i;
2376  int rows_per_statement = dopt->dump_inserts;
2377  int rows_this_statement = 0;
2378 
2379  /* Temporary allows to access to foreign tables to dump data */
2380  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2381  set_restrict_relation_kind(fout, "view");
2382 
2383  /*
2384  * If we're going to emit INSERTs with column names, the most efficient
2385  * way to deal with generated columns is to exclude them entirely. For
2386  * INSERTs without column names, we have to emit DEFAULT rather than the
2387  * actual column value --- but we can save a few cycles by fetching nulls
2388  * rather than the uninteresting-to-us value.
2389  */
2390  attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2391  appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2392  nfields = 0;
2393  for (i = 0; i < tbinfo->numatts; i++)
2394  {
2395  if (tbinfo->attisdropped[i])
2396  continue;
2397  if (tbinfo->attgenerated[i] && dopt->column_inserts)
2398  continue;
2399  if (nfields > 0)
2400  appendPQExpBufferStr(q, ", ");
2401  if (tbinfo->attgenerated[i])
2402  appendPQExpBufferStr(q, "NULL");
2403  else
2404  appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2405  attgenerated[nfields] = tbinfo->attgenerated[i];
2406  nfields++;
2407  }
2408  /* Servers before 9.4 will complain about zero-column SELECT */
2409  if (nfields == 0)
2410  appendPQExpBufferStr(q, "NULL");
2411  appendPQExpBuffer(q, " FROM ONLY %s",
2412  fmtQualifiedDumpable(tbinfo));
2413  if (tdinfo->filtercond)
2414  appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2415 
2416  ExecuteSqlStatement(fout, q->data);
2417 
2418  while (1)
2419  {
2420  res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2421  PGRES_TUPLES_OK);
2422 
2423  /* cross-check field count, allowing for dummy NULL if any */
2424  if (nfields != PQnfields(res) &&
2425  !(nfields == 0 && PQnfields(res) == 1))
2426  pg_fatal("wrong number of fields retrieved from table \"%s\"",
2427  tbinfo->dobj.name);
2428 
2429  /*
2430  * First time through, we build as much of the INSERT statement as
2431  * possible in "insertStmt", which we can then just print for each
2432  * statement. If the table happens to have zero dumpable columns then
2433  * this will be a complete statement, otherwise it will end in
2434  * "VALUES" and be ready to have the row's column values printed.
2435  */
2436  if (insertStmt == NULL)
2437  {
2438  TableInfo *targettab;
2439 
2440  insertStmt = createPQExpBuffer();
2441 
2442  /*
2443  * When load-via-partition-root is set or forced, get the root
2444  * table name for the partition table, so that we can reload data
2445  * through the root table.
2446  */
2447  if (tbinfo->ispartition &&
2448  (dopt->load_via_partition_root ||
2449  forcePartitionRootLoad(tbinfo)))
2450  targettab = getRootTableInfo(tbinfo);
2451  else
2452  targettab = tbinfo;
2453 
2454  appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2455  fmtQualifiedDumpable(targettab));
2456 
2457  /* corner case for zero-column table */
2458  if (nfields == 0)
2459  {
2460  appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2461  }
2462  else
2463  {
2464  /* append the list of column names if required */
2465  if (dopt->column_inserts)
2466  {
2467  appendPQExpBufferChar(insertStmt, '(');
2468  for (int field = 0; field < nfields; field++)
2469  {
2470  if (field > 0)
2471  appendPQExpBufferStr(insertStmt, ", ");
2472  appendPQExpBufferStr(insertStmt,
2473  fmtId(PQfname(res, field)));
2474  }
2475  appendPQExpBufferStr(insertStmt, ") ");
2476  }
2477 
2478  if (tbinfo->needs_override)
2479  appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2480 
2481  appendPQExpBufferStr(insertStmt, "VALUES");
2482  }
2483  }
2484 
2485  for (int tuple = 0; tuple < PQntuples(res); tuple++)
2486  {
2487  /* Write the INSERT if not in the middle of a multi-row INSERT. */
2488  if (rows_this_statement == 0)
2489  archputs(insertStmt->data, fout);
2490 
2491  /*
2492  * If it is zero-column table then we've already written the
2493  * complete statement, which will mean we've disobeyed
2494  * --rows-per-insert when it's set greater than 1. We do support
2495  * a way to make this multi-row with: SELECT UNION ALL SELECT
2496  * UNION ALL ... but that's non-standard so we should avoid it
2497  * given that using INSERTs is mostly only ever needed for
2498  * cross-database exports.
2499  */
2500  if (nfields == 0)
2501  continue;
2502 
2503  /* Emit a row heading */
2504  if (rows_per_statement == 1)
2505  archputs(" (", fout);
2506  else if (rows_this_statement > 0)
2507  archputs(",\n\t(", fout);
2508  else
2509  archputs("\n\t(", fout);
2510 
2511  for (int field = 0; field < nfields; field++)
2512  {
2513  if (field > 0)
2514  archputs(", ", fout);
2515  if (attgenerated[field])
2516  {
2517  archputs("DEFAULT", fout);
2518  continue;
2519  }
2520  if (PQgetisnull(res, tuple, field))
2521  {
2522  archputs("NULL", fout);
2523  continue;
2524  }
2525 
2526  /* XXX This code is partially duplicated in ruleutils.c */
2527  switch (PQftype(res, field))
2528  {
2529  case INT2OID:
2530  case INT4OID:
2531  case INT8OID:
2532  case OIDOID:
2533  case FLOAT4OID:
2534  case FLOAT8OID:
2535  case NUMERICOID:
2536  {
2537  /*
2538  * These types are printed without quotes unless
2539  * they contain values that aren't accepted by the
2540  * scanner unquoted (e.g., 'NaN'). Note that
2541  * strtod() and friends might accept NaN, so we
2542  * can't use that to test.
2543  *
2544  * In reality we only need to defend against
2545  * infinity and NaN, so we need not get too crazy
2546  * about pattern matching here.
2547  */
2548  const char *s = PQgetvalue(res, tuple, field);
2549 
2550  if (strspn(s, "0123456789 +-eE.") == strlen(s))
2551  archputs(s, fout);
2552  else
2553  archprintf(fout, "'%s'", s);
2554  }
2555  break;
2556 
2557  case BITOID:
2558  case VARBITOID:
2559  archprintf(fout, "B'%s'",
2560  PQgetvalue(res, tuple, field));
2561  break;
2562 
2563  case BOOLOID:
2564  if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2565  archputs("true", fout);
2566  else
2567  archputs("false", fout);
2568  break;
2569 
2570  default:
2571  /* All other types are printed as string literals. */
2572  resetPQExpBuffer(q);
2574  PQgetvalue(res, tuple, field),
2575  fout);
2576  archputs(q->data, fout);
2577  break;
2578  }
2579  }
2580 
2581  /* Terminate the row ... */
2582  archputs(")", fout);
2583 
2584  /* ... and the statement, if the target no. of rows is reached */
2585  if (++rows_this_statement >= rows_per_statement)
2586  {
2587  if (dopt->do_nothing)
2588  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2589  else
2590  archputs(";\n", fout);
2591  /* Reset the row counter */
2592  rows_this_statement = 0;
2593  }
2594  }
2595 
2596  if (PQntuples(res) <= 0)
2597  {
2598  PQclear(res);
2599  break;
2600  }
2601  PQclear(res);
2602  }
2603 
2604  /* Terminate any statements that didn't make the row count. */
2605  if (rows_this_statement > 0)
2606  {
2607  if (dopt->do_nothing)
2608  archputs(" ON CONFLICT DO NOTHING;\n", fout);
2609  else
2610  archputs(";\n", fout);
2611  }
2612 
2613  archputs("\n\n", fout);
2614 
2615  ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2616 
2617  destroyPQExpBuffer(q);
2618  if (insertStmt != NULL)
2619  destroyPQExpBuffer(insertStmt);
2620  free(attgenerated);
2621 
2622  /* Revert back the setting */
2623  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2624  set_restrict_relation_kind(fout, "view, foreign-table");
2625 
2626  return 1;
2627 }
2628 
2629 /*
2630  * getRootTableInfo:
2631  * get the root TableInfo for the given partition table.
2632  */
2633 static TableInfo *
2635 {
2636  TableInfo *parentTbinfo;
2637 
2638  Assert(tbinfo->ispartition);
2639  Assert(tbinfo->numParents == 1);
2640 
2641  parentTbinfo = tbinfo->parents[0];
2642  while (parentTbinfo->ispartition)
2643  {
2644  Assert(parentTbinfo->numParents == 1);
2645  parentTbinfo = parentTbinfo->parents[0];
2646  }
2647 
2648  return parentTbinfo;
2649 }
2650 
2651 /*
2652  * forcePartitionRootLoad
2653  * Check if we must force load_via_partition_root for this partition.
2654  *
2655  * This is required if any level of ancestral partitioned table has an
2656  * unsafe partitioning scheme.
2657  */
2658 static bool
2660 {
2661  TableInfo *parentTbinfo;
2662 
2663  Assert(tbinfo->ispartition);
2664  Assert(tbinfo->numParents == 1);
2665 
2666  parentTbinfo = tbinfo->parents[0];
2667  if (parentTbinfo->unsafe_partitions)
2668  return true;
2669  while (parentTbinfo->ispartition)
2670  {
2671  Assert(parentTbinfo->numParents == 1);
2672  parentTbinfo = parentTbinfo->parents[0];
2673  if (parentTbinfo->unsafe_partitions)
2674  return true;
2675  }
2676 
2677  return false;
2678 }
2679 
2680 /*
2681  * dumpTableData -
2682  * dump the contents of a single table
2683  *
2684  * Actually, this just makes an ArchiveEntry for the table contents.
2685  */
2686 static void
2687 dumpTableData(Archive *fout, const TableDataInfo *tdinfo)
2688 {
2689  DumpOptions *dopt = fout->dopt;
2690  TableInfo *tbinfo = tdinfo->tdtable;
2691  PQExpBuffer copyBuf = createPQExpBuffer();
2692  PQExpBuffer clistBuf = createPQExpBuffer();
2693  DataDumperPtr dumpFn;
2694  char *tdDefn = NULL;
2695  char *copyStmt;
2696  const char *copyFrom;
2697 
2698  /* We had better have loaded per-column details about this table */
2699  Assert(tbinfo->interesting);
2700 
2701  /*
2702  * When load-via-partition-root is set or forced, get the root table name
2703  * for the partition table, so that we can reload data through the root
2704  * table. Then construct a comment to be inserted into the TOC entry's
2705  * defn field, so that such cases can be identified reliably.
2706  */
2707  if (tbinfo->ispartition &&
2708  (dopt->load_via_partition_root ||
2709  forcePartitionRootLoad(tbinfo)))
2710  {
2711  TableInfo *parentTbinfo;
2712 
2713  parentTbinfo = getRootTableInfo(tbinfo);
2714  copyFrom = fmtQualifiedDumpable(parentTbinfo);
2715  printfPQExpBuffer(copyBuf, "-- load via partition root %s",
2716  copyFrom);
2717  tdDefn = pg_strdup(copyBuf->data);
2718  }
2719  else
2720  copyFrom = fmtQualifiedDumpable(tbinfo);
2721 
2722  if (dopt->dump_inserts == 0)
2723  {
2724  /* Dump/restore using COPY */
2725  dumpFn = dumpTableData_copy;
2726  /* must use 2 steps here 'cause fmtId is nonreentrant */
2727  printfPQExpBuffer(copyBuf, "COPY %s ",
2728  copyFrom);
2729  appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2730  fmtCopyColumnList(tbinfo, clistBuf));
2731  copyStmt = copyBuf->data;
2732  }
2733  else
2734  {
2735  /* Restore using INSERT */
2736  dumpFn = dumpTableData_insert;
2737  copyStmt = NULL;
2738  }
2739 
2740  /*
2741  * Note: although the TableDataInfo is a full DumpableObject, we treat its
2742  * dependency on its table as "special" and pass it to ArchiveEntry now.
2743  * See comments for BuildArchiveDependencies.
2744  */
2745  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2746  {
2747  TocEntry *te;
2748 
2749  te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2750  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2751  .namespace = tbinfo->dobj.namespace->dobj.name,
2752  .owner = tbinfo->rolname,
2753  .description = "TABLE DATA",
2754  .section = SECTION_DATA,
2755  .createStmt = tdDefn,
2756  .copyStmt = copyStmt,
2757  .deps = &(tbinfo->dobj.dumpId),
2758  .nDeps = 1,
2759  .dumpFn = dumpFn,
2760  .dumpArg = tdinfo));
2761 
2762  /*
2763  * Set the TocEntry's dataLength in case we are doing a parallel dump
2764  * and want to order dump jobs by table size. We choose to measure
2765  * dataLength in table pages (including TOAST pages) during dump, so
2766  * no scaling is needed.
2767  *
2768  * However, relpages is declared as "integer" in pg_class, and hence
2769  * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2770  * Cast so that we get the right interpretation of table sizes
2771  * exceeding INT_MAX pages.
2772  */
2773  te->dataLength = (BlockNumber) tbinfo->relpages;
2774  te->dataLength += (BlockNumber) tbinfo->toastpages;
2775 
2776  /*
2777  * If pgoff_t is only 32 bits wide, the above refinement is useless,
2778  * and instead we'd better worry about integer overflow. Clamp to
2779  * INT_MAX if the correct result exceeds that.
2780  */
2781  if (sizeof(te->dataLength) == 4 &&
2782  (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2783  te->dataLength < 0))
2784  te->dataLength = INT_MAX;
2785  }
2786 
2787  destroyPQExpBuffer(copyBuf);
2788  destroyPQExpBuffer(clistBuf);
2789 }
2790 
2791 /*
2792  * refreshMatViewData -
2793  * load or refresh the contents of a single materialized view
2794  *
2795  * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2796  * statement.
2797  */
2798 static void
2800 {
2801  TableInfo *tbinfo = tdinfo->tdtable;
2802  PQExpBuffer q;
2803 
2804  /* If the materialized view is not flagged as populated, skip this. */
2805  if (!tbinfo->relispopulated)
2806  return;
2807 
2808  q = createPQExpBuffer();
2809 
2810  appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2811  fmtQualifiedDumpable(tbinfo));
2812 
2813  if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2814  ArchiveEntry(fout,
2815  tdinfo->dobj.catId, /* catalog ID */
2816  tdinfo->dobj.dumpId, /* dump ID */
2817  ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2818  .namespace = tbinfo->dobj.namespace->dobj.name,
2819  .owner = tbinfo->rolname,
2820  .description = "MATERIALIZED VIEW DATA",
2821  .section = SECTION_POST_DATA,
2822  .createStmt = q->data,
2823  .deps = tdinfo->dobj.dependencies,
2824  .nDeps = tdinfo->dobj.nDeps));
2825 
2826  destroyPQExpBuffer(q);
2827 }
2828 
2829 /*
2830  * getTableData -
2831  * set up dumpable objects representing the contents of tables
2832  */
2833 static void
2834 getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2835 {
2836  int i;
2837 
2838  for (i = 0; i < numTables; i++)
2839  {
2840  if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2841  (!relkind || tblinfo[i].relkind == relkind))
2842  makeTableDataInfo(dopt, &(tblinfo[i]));
2843  }
2844 }
2845 
2846 /*
2847  * Make a dumpable object for the data of this specific table
2848  *
2849  * Note: we make a TableDataInfo if and only if we are going to dump the
2850  * table data; the "dump" field in such objects isn't very interesting.
2851  */
2852 static void
2854 {
2855  TableDataInfo *tdinfo;
2856 
2857  /*
2858  * Nothing to do if we already decided to dump the table. This will
2859  * happen for "config" tables.
2860  */
2861  if (tbinfo->dataObj != NULL)
2862  return;
2863 
2864  /* Skip VIEWs (no data to dump) */
2865  if (tbinfo->relkind == RELKIND_VIEW)
2866  return;
2867  /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2868  if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2871  tbinfo->foreign_server)))
2872  return;
2873  /* Skip partitioned tables (data in partitions) */
2874  if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2875  return;
2876 
2877  /* Don't dump data in unlogged tables, if so requested */
2878  if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2879  dopt->no_unlogged_table_data)
2880  return;
2881 
2882  /* Check that the data is not explicitly excluded */
2884  tbinfo->dobj.catId.oid))
2885  return;
2886 
2887  /* OK, let's dump it */
2888  tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2889 
2890  if (tbinfo->relkind == RELKIND_MATVIEW)
2891  tdinfo->dobj.objType = DO_REFRESH_MATVIEW;
2892  else if (tbinfo->relkind == RELKIND_SEQUENCE)
2893  tdinfo->dobj.objType = DO_SEQUENCE_SET;
2894  else
2895  tdinfo->dobj.objType = DO_TABLE_DATA;
2896 
2897  /*
2898  * Note: use tableoid 0 so that this object won't be mistaken for
2899  * something that pg_depend entries apply to.
2900  */
2901  tdinfo->dobj.catId.tableoid = 0;
2902  tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2903  AssignDumpId(&tdinfo->dobj);
2904  tdinfo->dobj.name = tbinfo->dobj.name;
2905  tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2906  tdinfo->tdtable = tbinfo;
2907  tdinfo->filtercond = NULL; /* might get set later */
2908  addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2909 
2910  /* A TableDataInfo contains data, of course */
2911  tdinfo->dobj.components |= DUMP_COMPONENT_DATA;
2912 
2913  tbinfo->dataObj = tdinfo;
2914 
2915  /* Make sure that we'll collect per-column info for this table. */
2916  tbinfo->interesting = true;
2917 }
2918 
2919 /*
2920  * The refresh for a materialized view must be dependent on the refresh for
2921  * any materialized view that this one is dependent on.
2922  *
2923  * This must be called after all the objects are created, but before they are
2924  * sorted.
2925  */
2926 static void
2928 {
2929  PQExpBuffer query;
2930  PGresult *res;
2931  int ntups,
2932  i;
2933  int i_classid,
2934  i_objid,
2935  i_refobjid;
2936 
2937  /* No Mat Views before 9.3. */
2938  if (fout->remoteVersion < 90300)
2939  return;
2940 
2941  query = createPQExpBuffer();
2942 
2943  appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2944  "( "
2945  "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2946  "FROM pg_depend d1 "
2947  "JOIN pg_class c1 ON c1.oid = d1.objid "
2948  "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2949  " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2950  "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2951  "AND d2.objid = r1.oid "
2952  "AND d2.refobjid <> d1.objid "
2953  "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2954  "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2955  CppAsString2(RELKIND_VIEW) ") "
2956  "WHERE d1.classid = 'pg_class'::regclass "
2957  "UNION "
2958  "SELECT w.objid, d3.refobjid, c3.relkind "
2959  "FROM w "
2960  "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2961  "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2962  "AND d3.objid = r3.oid "
2963  "AND d3.refobjid <> w.refobjid "
2964  "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2965  "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2966  CppAsString2(RELKIND_VIEW) ") "
2967  ") "
2968  "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2969  "FROM w "
2970  "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2971 
2972  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2973 
2974  ntups = PQntuples(res);
2975 
2976  i_classid = PQfnumber(res, "classid");
2977  i_objid = PQfnumber(res, "objid");
2978  i_refobjid = PQfnumber(res, "refobjid");
2979 
2980  for (i = 0; i < ntups; i++)
2981  {
2982  CatalogId objId;
2983  CatalogId refobjId;
2984  DumpableObject *dobj;
2985  DumpableObject *refdobj;
2986  TableInfo *tbinfo;
2987  TableInfo *reftbinfo;
2988 
2989  objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
2990  objId.oid = atooid(PQgetvalue(res, i, i_objid));
2991  refobjId.tableoid = objId.tableoid;
2992  refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
2993 
2994  dobj = findObjectByCatalogId(objId);
2995  if (dobj == NULL)
2996  continue;
2997 
2998  Assert(dobj->objType == DO_TABLE);
2999  tbinfo = (TableInfo *) dobj;
3000  Assert(tbinfo->relkind == RELKIND_MATVIEW);
3001  dobj = (DumpableObject *) tbinfo->dataObj;
3002  if (dobj == NULL)
3003  continue;
3004  Assert(dobj->objType == DO_REFRESH_MATVIEW);
3005 
3006  refdobj = findObjectByCatalogId(refobjId);
3007  if (refdobj == NULL)
3008  continue;
3009 
3010  Assert(refdobj->objType == DO_TABLE);
3011  reftbinfo = (TableInfo *) refdobj;
3012  Assert(reftbinfo->relkind == RELKIND_MATVIEW);
3013  refdobj = (DumpableObject *) reftbinfo->dataObj;
3014  if (refdobj == NULL)
3015  continue;
3016  Assert(refdobj->objType == DO_REFRESH_MATVIEW);
3017 
3018  addObjectDependency(dobj, refdobj->dumpId);
3019 
3020  if (!reftbinfo->relispopulated)
3021  tbinfo->relispopulated = false;
3022  }
3023 
3024  PQclear(res);
3025 
3026  destroyPQExpBuffer(query);
3027 }
3028 
3029 /*
3030  * getTableDataFKConstraints -
3031  * add dump-order dependencies reflecting foreign key constraints
3032  *
3033  * This code is executed only in a data-only dump --- in schema+data dumps
3034  * we handle foreign key issues by not creating the FK constraints until
3035  * after the data is loaded. In a data-only dump, however, we want to
3036  * order the table data objects in such a way that a table's referenced
3037  * tables are restored first. (In the presence of circular references or
3038  * self-references this may be impossible; we'll detect and complain about
3039  * that during the dependency sorting step.)
3040  */
3041 static void
3043 {
3044  DumpableObject **dobjs;
3045  int numObjs;
3046  int i;
3047 
3048  /* Search through all the dumpable objects for FK constraints */
3049  getDumpableObjects(&dobjs, &numObjs);
3050  for (i = 0; i < numObjs; i++)
3051  {
3052  if (dobjs[i]->objType == DO_FK_CONSTRAINT)
3053  {
3054  ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
3055  TableInfo *ftable;
3056 
3057  /* Not interesting unless both tables are to be dumped */
3058  if (cinfo->contable == NULL ||
3059  cinfo->contable->dataObj == NULL)
3060  continue;
3061  ftable = findTableByOid(cinfo->confrelid);
3062  if (ftable == NULL ||
3063  ftable->dataObj == NULL)
3064  continue;
3065 
3066  /*
3067  * Okay, make referencing table's TABLE_DATA object depend on the
3068  * referenced table's TABLE_DATA object.
3069  */
3071  ftable->dataObj->dobj.dumpId);
3072  }
3073  }
3074  free(dobjs);
3075 }
3076 
3077 
3078 /*
3079  * dumpDatabase:
3080  * dump the database definition
3081  */
3082 static void
3084 {
3085  DumpOptions *dopt = fout->dopt;
3086  PQExpBuffer dbQry = createPQExpBuffer();
3087  PQExpBuffer delQry = createPQExpBuffer();
3088  PQExpBuffer creaQry = createPQExpBuffer();
3089  PQExpBuffer labelq = createPQExpBuffer();
3090  PGconn *conn = GetConnection(fout);
3091  PGresult *res;
3092  int i_tableoid,
3093  i_oid,
3094  i_datname,
3095  i_datdba,
3096  i_encoding,
3097  i_datlocprovider,
3098  i_collate,
3099  i_ctype,
3100  i_datlocale,
3101  i_daticurules,
3102  i_frozenxid,
3103  i_minmxid,
3104  i_datacl,
3105  i_acldefault,
3106  i_datistemplate,
3107  i_datconnlimit,
3108  i_datcollversion,
3109  i_tablespace;
3110  CatalogId dbCatId;
3111  DumpId dbDumpId;
3112  DumpableAcl dbdacl;
3113  const char *datname,
3114  *dba,
3115  *encoding,
3116  *datlocprovider,
3117  *collate,
3118  *ctype,
3119  *locale,
3120  *icurules,
3121  *datistemplate,
3122  *datconnlimit,
3123  *tablespace;
3124  uint32 frozenxid,
3125  minmxid;
3126  char *qdatname;
3127 
3128  pg_log_info("saving database definition");
3129 
3130  /*
3131  * Fetch the database-level properties for this database.
3132  */
3133  appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
3134  "datdba, "
3135  "pg_encoding_to_char(encoding) AS encoding, "
3136  "datcollate, datctype, datfrozenxid, "
3137  "datacl, acldefault('d', datdba) AS acldefault, "
3138  "datistemplate, datconnlimit, ");
3139  if (fout->remoteVersion >= 90300)
3140  appendPQExpBufferStr(dbQry, "datminmxid, ");
3141  else
3142  appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
3143  if (fout->remoteVersion >= 170000)
3144  appendPQExpBufferStr(dbQry, "datlocprovider, datlocale, datcollversion, ");
3145  else if (fout->remoteVersion >= 150000)
3146  appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale AS datlocale, datcollversion, ");
3147  else
3148  appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS datlocale, NULL AS datcollversion, ");
3149  if (fout->remoteVersion >= 160000)
3150  appendPQExpBufferStr(dbQry, "daticurules, ");
3151  else
3152  appendPQExpBufferStr(dbQry, "NULL AS daticurules, ");
3153  appendPQExpBufferStr(dbQry,
3154  "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
3155  "shobj_description(oid, 'pg_database') AS description "
3156  "FROM pg_database "
3157  "WHERE datname = current_database()");
3158 
3159  res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
3160 
3161  i_tableoid = PQfnumber(res, "tableoid");
3162  i_oid = PQfnumber(res, "oid");
3163  i_datname = PQfnumber(res, "datname");
3164  i_datdba = PQfnumber(res, "datdba");
3165  i_encoding = PQfnumber(res, "encoding");
3166  i_datlocprovider = PQfnumber(res, "datlocprovider");
3167  i_collate = PQfnumber(res, "datcollate");
3168  i_ctype = PQfnumber(res, "datctype");
3169  i_datlocale = PQfnumber(res, "datlocale");
3170  i_daticurules = PQfnumber(res, "daticurules");
3171  i_frozenxid = PQfnumber(res, "datfrozenxid");
3172  i_minmxid = PQfnumber(res, "datminmxid");
3173  i_datacl = PQfnumber(res, "datacl");
3174  i_acldefault = PQfnumber(res, "acldefault");
3175  i_datistemplate = PQfnumber(res, "datistemplate");
3176  i_datconnlimit = PQfnumber(res, "datconnlimit");
3177  i_datcollversion = PQfnumber(res, "datcollversion");
3178  i_tablespace = PQfnumber(res, "tablespace");
3179 
3180  dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
3181  dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3182  datname = PQgetvalue(res, 0, i_datname);
3183  dba = getRoleName(PQgetvalue(res, 0, i_datdba));
3184  encoding = PQgetvalue(res, 0, i_encoding);
3185  datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
3186  collate = PQgetvalue(res, 0, i_collate);
3187  ctype = PQgetvalue(res, 0, i_ctype);
3188  if (!PQgetisnull(res, 0, i_datlocale))
3189  locale = PQgetvalue(res, 0, i_datlocale);
3190  else
3191  locale = NULL;
3192  if (!PQgetisnull(res, 0, i_daticurules))
3193  icurules = PQgetvalue(res, 0, i_daticurules);
3194  else
3195  icurules = NULL;
3196  frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3197  minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3198  dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3199  dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3200  datistemplate = PQgetvalue(res, 0, i_datistemplate);
3201  datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3202  tablespace = PQgetvalue(res, 0, i_tablespace);
3203 
3204  qdatname = pg_strdup(fmtId(datname));
3205 
3206  /*
3207  * Prepare the CREATE DATABASE command. We must specify OID (if we want
3208  * to preserve that), as well as the encoding, locale, and tablespace
3209  * since those can't be altered later. Other DB properties are left to
3210  * the DATABASE PROPERTIES entry, so that they can be applied after
3211  * reconnecting to the target DB.
3212  *
3213  * For binary upgrade, we use the FILE_COPY strategy because testing has
3214  * shown it to be faster. When the server is in binary upgrade mode, it
3215  * will also skip the checkpoints this strategy ordinarily performs.
3216  */
3217  if (dopt->binary_upgrade)
3218  {
3219  appendPQExpBuffer(creaQry,
3220  "CREATE DATABASE %s WITH TEMPLATE = template0 "
3221  "OID = %u STRATEGY = FILE_COPY",
3222  qdatname, dbCatId.oid);
3223  }
3224  else
3225  {
3226  appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3227  qdatname);
3228  }
3229  if (strlen(encoding) > 0)
3230  {
3231  appendPQExpBufferStr(creaQry, " ENCODING = ");
3232  appendStringLiteralAH(creaQry, encoding, fout);
3233  }
3234 
3235  appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
3236  if (datlocprovider[0] == 'b')
3237  appendPQExpBufferStr(creaQry, "builtin");
3238  else if (datlocprovider[0] == 'c')
3239  appendPQExpBufferStr(creaQry, "libc");
3240  else if (datlocprovider[0] == 'i')
3241  appendPQExpBufferStr(creaQry, "icu");
3242  else
3243  pg_fatal("unrecognized locale provider: %s",
3244  datlocprovider);
3245 
3246  if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3247  {
3248  appendPQExpBufferStr(creaQry, " LOCALE = ");
3249  appendStringLiteralAH(creaQry, collate, fout);
3250  }
3251  else
3252  {
3253  if (strlen(collate) > 0)
3254  {
3255  appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3256  appendStringLiteralAH(creaQry, collate, fout);
3257  }
3258  if (strlen(ctype) > 0)
3259  {
3260  appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3261  appendStringLiteralAH(creaQry, ctype, fout);
3262  }
3263  }
3264  if (locale)
3265  {
3266  if (datlocprovider[0] == 'b')
3267  appendPQExpBufferStr(creaQry, " BUILTIN_LOCALE = ");
3268  else
3269  appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
3270 
3271  appendStringLiteralAH(creaQry, locale, fout);
3272  }
3273 
3274  if (icurules)
3275  {
3276  appendPQExpBufferStr(creaQry, " ICU_RULES = ");
3277  appendStringLiteralAH(creaQry, icurules, fout);
3278  }
3279 
3280  /*
3281  * For binary upgrade, carry over the collation version. For normal
3282  * dump/restore, omit the version, so that it is computed upon restore.
3283  */
3284  if (dopt->binary_upgrade)
3285  {
3286  if (!PQgetisnull(res, 0, i_datcollversion))
3287  {
3288  appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3289  appendStringLiteralAH(creaQry,
3290  PQgetvalue(res, 0, i_datcollversion),
3291  fout);
3292  }
3293  }
3294 
3295  /*
3296  * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3297  * thing; the decision whether to specify a tablespace should be left till
3298  * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3299  * label the DATABASE entry with the tablespace and let the normal
3300  * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3301  * attention to default_tablespace, so that won't work.
3302  */
3303  if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3304  !dopt->outputNoTablespaces)
3305  appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3306  fmtId(tablespace));
3307  appendPQExpBufferStr(creaQry, ";\n");
3308 
3309  appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3310  qdatname);
3311 
3312  dbDumpId = createDumpId();
3313 
3314  ArchiveEntry(fout,
3315  dbCatId, /* catalog ID */
3316  dbDumpId, /* dump ID */
3317  ARCHIVE_OPTS(.tag = datname,
3318  .owner = dba,
3319  .description = "DATABASE",
3320  .section = SECTION_PRE_DATA,
3321  .createStmt = creaQry->data,
3322  .dropStmt = delQry->data));
3323 
3324  /* Compute correct tag for archive entry */
3325  appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3326 
3327  /* Dump DB comment if any */
3328  {
3329  /*
3330  * 8.2 and up keep comments on shared objects in a shared table, so we
3331  * cannot use the dumpComment() code used for other database objects.
3332  * Be careful that the ArchiveEntry parameters match that function.
3333  */
3334  char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3335 
3336  if (comment && *comment && !dopt->no_comments)
3337  {
3338  resetPQExpBuffer(dbQry);
3339 
3340  /*
3341  * Generates warning when loaded into a differently-named
3342  * database.
3343  */
3344  appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3345  appendStringLiteralAH(dbQry, comment, fout);
3346  appendPQExpBufferStr(dbQry, ";\n");
3347 
3349  ARCHIVE_OPTS(.tag = labelq->data,
3350  .owner = dba,
3351  .description = "COMMENT",
3352  .section = SECTION_NONE,
3353  .createStmt = dbQry->data,
3354  .deps = &dbDumpId,
3355  .nDeps = 1));
3356  }
3357  }
3358 
3359  /* Dump DB security label, if enabled */
3360  if (!dopt->no_security_labels)
3361  {
3362  PGresult *shres;
3363  PQExpBuffer seclabelQry;
3364 
3365  seclabelQry = createPQExpBuffer();
3366 
3367  buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3368  shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3369  resetPQExpBuffer(seclabelQry);
3370  emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3371  if (seclabelQry->len > 0)
3373  ARCHIVE_OPTS(.tag = labelq->data,
3374  .owner = dba,
3375  .description = "SECURITY LABEL",
3376  .section = SECTION_NONE,
3377  .createStmt = seclabelQry->data,
3378  .deps = &dbDumpId,
3379  .nDeps = 1));
3380  destroyPQExpBuffer(seclabelQry);
3381  PQclear(shres);
3382  }
3383 
3384  /*
3385  * Dump ACL if any. Note that we do not support initial privileges
3386  * (pg_init_privs) on databases.
3387  */
3388  dbdacl.privtype = 0;
3389  dbdacl.initprivs = NULL;
3390 
3391  dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3392  qdatname, NULL, NULL,
3393  NULL, dba, &dbdacl);
3394 
3395  /*
3396  * Now construct a DATABASE PROPERTIES archive entry to restore any
3397  * non-default database-level properties. (The reason this must be
3398  * separate is that we cannot put any additional commands into the TOC
3399  * entry that has CREATE DATABASE. pg_restore would execute such a group
3400  * in an implicit transaction block, and the backend won't allow CREATE
3401  * DATABASE in that context.)
3402  */
3403  resetPQExpBuffer(creaQry);
3404  resetPQExpBuffer(delQry);
3405 
3406  if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3407  appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3408  qdatname, datconnlimit);
3409 
3410  if (strcmp(datistemplate, "t") == 0)
3411  {
3412  appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3413  qdatname);
3414 
3415  /*
3416  * The backend won't accept DROP DATABASE on a template database. We
3417  * can deal with that by removing the template marking before the DROP
3418  * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3419  * since no such command is currently supported, fake it with a direct
3420  * UPDATE on pg_database.
3421  */
3422  appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3423  "SET datistemplate = false WHERE datname = ");
3424  appendStringLiteralAH(delQry, datname, fout);
3425  appendPQExpBufferStr(delQry, ";\n");
3426  }
3427 
3428  /*
3429  * We do not restore pg_database.dathasloginevt because it is set
3430  * automatically on login event trigger creation.
3431  */
3432 
3433  /* Add database-specific SET options */
3434  dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3435 
3436  /*
3437  * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3438  * entry, too, for lack of a better place.
3439  */
3440  if (dopt->binary_upgrade)
3441  {
3442  appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3443  appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3444  "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3445  "WHERE datname = ",
3446  frozenxid, minmxid);
3447  appendStringLiteralAH(creaQry, datname, fout);
3448  appendPQExpBufferStr(creaQry, ";\n");
3449  }
3450 
3451  if (creaQry->len > 0)
3453  ARCHIVE_OPTS(.tag = datname,
3454  .owner = dba,
3455  .description = "DATABASE PROPERTIES",
3456  .section = SECTION_PRE_DATA,
3457  .createStmt = creaQry->data,
3458  .dropStmt = delQry->data,
3459  .deps = &dbDumpId));
3460 
3461  /*
3462  * pg_largeobject comes from the old system intact, so set its
3463  * relfrozenxids, relminmxids and relfilenode.
3464  */
3465  if (dopt->binary_upgrade)
3466  {
3467  PGresult *lo_res;
3468  PQExpBuffer loFrozenQry = createPQExpBuffer();
3469  PQExpBuffer loOutQry = createPQExpBuffer();
3470  PQExpBuffer loHorizonQry = createPQExpBuffer();
3471  int ii_relfrozenxid,
3472  ii_relfilenode,
3473  ii_oid,
3474  ii_relminmxid;
3475 
3476  /*
3477  * pg_largeobject
3478  */
3479  if (fout->remoteVersion >= 90300)
3480  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3481  "FROM pg_catalog.pg_class\n"
3482  "WHERE oid IN (%u, %u);\n",
3483  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3484  else
3485  appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3486  "FROM pg_catalog.pg_class\n"
3487  "WHERE oid IN (%u, %u);\n",
3488  LargeObjectRelationId, LargeObjectLOidPNIndexId);
3489 
3490  lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3491 
3492  ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3493  ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3494  ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3495  ii_oid = PQfnumber(lo_res, "oid");
3496 
3497  appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3498  appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3499  for (int i = 0; i < PQntuples(lo_res); ++i)
3500  {
3501  Oid oid;
3502  RelFileNumber relfilenumber;
3503 
3504  appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3505  "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3506  "WHERE oid = %u;\n",
3507  atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3508  atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3509  atooid(PQgetvalue(lo_res, i, ii_oid)));
3510 
3511  oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3512  relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3513 
3514  if (oid == LargeObjectRelationId)
3515  appendPQExpBuffer(loOutQry,
3516  "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3517  relfilenumber);
3518  else if (oid == LargeObjectLOidPNIndexId)
3519  appendPQExpBuffer(loOutQry,
3520  "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3521  relfilenumber);
3522  }
3523 
3524  appendPQExpBufferStr(loOutQry,
3525  "TRUNCATE pg_catalog.pg_largeobject;\n");
3526  appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3527 
3529  ARCHIVE_OPTS(.tag = "pg_largeobject",
3530  .description = "pg_largeobject",
3531  .section = SECTION_PRE_DATA,
3532  .createStmt = loOutQry->data));
3533 
3534  PQclear(lo_res);
3535 
3536  destroyPQExpBuffer(loFrozenQry);
3537  destroyPQExpBuffer(loHorizonQry);
3538  destroyPQExpBuffer(loOutQry);
3539  }
3540 
3541  PQclear(res);
3542 
3543  free(qdatname);
3544  destroyPQExpBuffer(dbQry);
3545  destroyPQExpBuffer(delQry);
3546  destroyPQExpBuffer(creaQry);
3547  destroyPQExpBuffer(labelq);
3548 }
3549 
3550 /*
3551  * Collect any database-specific or role-and-database-specific SET options
3552  * for this database, and append them to outbuf.
3553  */
3554 static void
3556  const char *dbname, Oid dboid)
3557 {
3558  PGconn *conn = GetConnection(AH);
3560  PGresult *res;
3561 
3562  /* First collect database-specific options */
3563  printfPQExpBuffer(buf, "SELECT unnest(setconfig) FROM pg_db_role_setting "
3564  "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3565  dboid);
3566 
3567  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3568 
3569  for (int i = 0; i < PQntuples(res); i++)
3571  "DATABASE", dbname, NULL, NULL,
3572  outbuf);
3573 
3574  PQclear(res);
3575 
3576  /* Now look for role-and-database-specific options */
3577  printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3578  "FROM pg_db_role_setting s, pg_roles r "
3579  "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3580  dboid);
3581 
3582  res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3583 
3584  for (int i = 0; i < PQntuples(res); i++)
3586  "ROLE", PQgetvalue(res, i, 0),
3587  "DATABASE", dbname,
3588  outbuf);
3589 
3590  PQclear(res);
3591 
3593 }
3594 
3595 /*
3596  * dumpEncoding: put the correct encoding into the archive
3597  */
3598 static void
3600 {
3601  const char *encname = pg_encoding_to_char(AH->encoding);
3603 
3604  pg_log_info("saving encoding = %s", encname);
3605 
3606  appendPQExpBufferStr(qry, "SET client_encoding = ");
3607  appendStringLiteralAH(qry, encname, AH);
3608  appendPQExpBufferStr(qry, ";\n");
3609 
3611  ARCHIVE_OPTS(.tag = "ENCODING",
3612  .description = "ENCODING",
3613  .section = SECTION_PRE_DATA,
3614  .createStmt = qry->data));
3615 
3616  destroyPQExpBuffer(qry);
3617 }
3618 
3619 
3620 /*
3621  * dumpStdStrings: put the correct escape string behavior into the archive
3622  */
3623 static void
3625 {
3626  const char *stdstrings = AH->std_strings ? "on" : "off";
3628 
3629  pg_log_info("saving \"standard_conforming_strings = %s\"",
3630  stdstrings);
3631 
3632  appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3633  stdstrings);
3634 
3636  ARCHIVE_OPTS(.tag = "STDSTRINGS",
3637  .description = "STDSTRINGS",
3638  .section = SECTION_PRE_DATA,
3639  .createStmt = qry->data));
3640 
3641  destroyPQExpBuffer(qry);
3642 }
3643 
3644 /*
3645  * dumpSearchPath: record the active search_path in the archive
3646  */
3647 static void
3649 {
3651  PQExpBuffer path = createPQExpBuffer();
3652  PGresult *res;
3653  char **schemanames = NULL;
3654  int nschemanames = 0;
3655  int i;
3656 
3657  /*
3658  * We use the result of current_schemas(), not the search_path GUC,
3659  * because that might contain wildcards such as "$user", which won't
3660  * necessarily have the same value during restore. Also, this way avoids
3661  * listing schemas that may appear in search_path but not actually exist,
3662  * which seems like a prudent exclusion.
3663  */
3665  "SELECT pg_catalog.current_schemas(false)");
3666 
3667  if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3668  pg_fatal("could not parse result of current_schemas()");
3669 
3670  /*
3671  * We use set_config(), not a simple "SET search_path" command, because
3672  * the latter has less-clean behavior if the search path is empty. While
3673  * that's likely to get fixed at some point, it seems like a good idea to
3674  * be as backwards-compatible as possible in what we put into archives.
3675  */
3676  for (i = 0; i < nschemanames; i++)
3677  {
3678  if (i > 0)
3679  appendPQExpBufferStr(path, ", ");
3680  appendPQExpBufferStr(path, fmtId(schemanames[i]));
3681  }
3682 
3683  appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3684  appendStringLiteralAH(qry, path->data, AH);
3685  appendPQExpBufferStr(qry, ", false);\n");
3686 
3687  pg_log_info("saving \"search_path = %s\"", path->data);
3688 
3690  ARCHIVE_OPTS(.tag = "SEARCHPATH",
3691  .description = "SEARCHPATH",
3692  .section = SECTION_PRE_DATA,
3693  .createStmt = qry->data));
3694 
3695  /* Also save it in AH->searchpath, in case we're doing plain text dump */
3696  AH->searchpath = pg_strdup(qry->data);
3697 
3698  free(schemanames);
3699  PQclear(res);
3700  destroyPQExpBuffer(qry);
3701  destroyPQExpBuffer(path);
3702 }
3703 
3704 
3705 /*
3706  * getLOs:
3707  * Collect schema-level data about large objects
3708  */
3709 static void
3711 {
3712  DumpOptions *dopt = fout->dopt;
3713  PQExpBuffer loQry = createPQExpBuffer();
3714  PGresult *res;
3715  int ntups;
3716  int i;
3717  int n;
3718  int i_oid;
3719  int i_lomowner;
3720  int i_lomacl;
3721  int i_acldefault;
3722 
3723  pg_log_info("reading large objects");
3724 
3725  /*
3726  * Fetch LO OIDs and owner/ACL data. Order the data so that all the blobs
3727  * with the same owner/ACL appear together.
3728  */
3729  appendPQExpBufferStr(loQry,
3730  "SELECT oid, lomowner, lomacl, "
3731  "acldefault('L', lomowner) AS acldefault "
3732  "FROM pg_largeobject_metadata "
3733  "ORDER BY lomowner, lomacl::pg_catalog.text, oid");
3734 
3735  res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3736 
3737  i_oid = PQfnumber(res, "oid");
3738  i_lomowner = PQfnumber(res, "lomowner");
3739  i_lomacl = PQfnumber(res, "lomacl");
3740  i_acldefault = PQfnumber(res, "acldefault");
3741 
3742  ntups = PQntuples(res);
3743 
3744  /*
3745  * Group the blobs into suitably-sized groups that have the same owner and
3746  * ACL setting, and build a metadata and a data DumpableObject for each
3747  * group. (If we supported initprivs for blobs, we'd have to insist that
3748  * groups also share initprivs settings, since the DumpableObject only has
3749  * room for one.) i is the index of the first tuple in the current group,
3750  * and n is the number of tuples we include in the group.
3751  */
3752  for (i = 0; i < ntups; i += n)
3753  {
3754  Oid thisoid = atooid(PQgetvalue(res, i, i_oid));
3755  char *thisowner = PQgetvalue(res, i, i_lomowner);
3756  char *thisacl = PQgetvalue(res, i, i_lomacl);
3757  LoInfo *loinfo;
3758  DumpableObject *lodata;
3759  char namebuf[64];
3760 
3761  /* Scan to find first tuple not to be included in group */
3762  n = 1;
3763  while (n < MAX_BLOBS_PER_ARCHIVE_ENTRY && i + n < ntups)
3764  {
3765  if (strcmp(thisowner, PQgetvalue(res, i + n, i_lomowner)) != 0 ||
3766  strcmp(thisacl, PQgetvalue(res, i + n, i_lomacl)) != 0)
3767  break;
3768  n++;
3769  }
3770 
3771  /* Build the metadata DumpableObject */
3772  loinfo = (LoInfo *) pg_malloc(offsetof(LoInfo, looids) + n * sizeof(Oid));
3773 
3774  loinfo->dobj.objType = DO_LARGE_OBJECT;
3775  loinfo->dobj.catId.tableoid = LargeObjectRelationId;
3776  loinfo->dobj.catId.oid = thisoid;
3777  AssignDumpId(&loinfo->dobj);
3778 
3779  if (n > 1)
3780  snprintf(namebuf, sizeof(namebuf), "%u..%u", thisoid,
3781  atooid(PQgetvalue(res, i + n - 1, i_oid)));
3782  else
3783  snprintf(namebuf, sizeof(namebuf), "%u", thisoid);
3784  loinfo->dobj.name = pg_strdup(namebuf);
3785  loinfo->dacl.acl = pg_strdup(thisacl);
3786  loinfo->dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3787  loinfo->dacl.privtype = 0;
3788  loinfo->dacl.initprivs = NULL;
3789  loinfo->rolname = getRoleName(thisowner);
3790  loinfo->numlos = n;
3791  loinfo->looids[0] = thisoid;
3792  /* Collect OIDs of the remaining blobs in this group */
3793  for (int k = 1; k < n; k++)
3794  {
3795  CatalogId extraID;
3796 
3797  loinfo->looids[k] = atooid(PQgetvalue(res, i + k, i_oid));
3798 
3799  /* Make sure we can look up loinfo by any of the blobs' OIDs */
3800  extraID.tableoid = LargeObjectRelationId;
3801  extraID.oid = loinfo->looids[k];
3802  recordAdditionalCatalogID(extraID, &loinfo->dobj);
3803  }
3804 
3805  /* LOs have data */
3806  loinfo->dobj.components |= DUMP_COMPONENT_DATA;
3807 
3808  /* Mark whether LO group has a non-empty ACL */
3809  if (!PQgetisnull(res, i, i_lomacl))
3810  loinfo->dobj.components |= DUMP_COMPONENT_ACL;
3811 
3812  /*
3813  * In binary-upgrade mode for LOs, we do *not* dump out the LO data,
3814  * as it will be copied by pg_upgrade, which simply copies the
3815  * pg_largeobject table. We *do* however dump out anything but the
3816  * data, as pg_upgrade copies just pg_largeobject, but not
3817  * pg_largeobject_metadata, after the dump is restored.
3818  */
3819  if (dopt->binary_upgrade)
3820  loinfo->dobj.dump &= ~DUMP_COMPONENT_DATA;
3821 
3822  /*
3823  * Create a "BLOBS" data item for the group, too. This is just a
3824  * placeholder for sorting; it carries no data now.
3825  */
3826  lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3827  lodata->objType = DO_LARGE_OBJECT_DATA;
3828  lodata->catId = nilCatalogId;
3829  AssignDumpId(lodata);
3830  lodata->name = pg_strdup(namebuf);
3831  lodata->components |= DUMP_COMPONENT_DATA;
3832  /* Set up explicit dependency from data to metadata */
3833  lodata->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
3834  lodata->dependencies[0] = loinfo->dobj.dumpId;
3835  lodata->nDeps = lodata->allocDeps = 1;
3836  }
3837 
3838  PQclear(res);
3839  destroyPQExpBuffer(loQry);
3840 }
3841 
3842 /*
3843  * dumpLO
3844  *
3845  * dump the definition (metadata) of the given large object group
3846  */
3847 static void
3848 dumpLO(Archive *fout, const LoInfo *loinfo)
3849 {
3850  PQExpBuffer cquery = createPQExpBuffer();
3851 
3852  /*
3853  * The "definition" is just a newline-separated list of OIDs. We need to
3854  * put something into the dropStmt too, but it can just be a comment.
3855  */
3856  for (int i = 0; i < loinfo->numlos; i++)
3857  appendPQExpBuffer(cquery, "%u\n", loinfo->looids[i]);
3858 
3859  if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3860  ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3861  ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3862  .owner = loinfo->rolname,
3863  .description = "BLOB METADATA",
3864  .section = SECTION_DATA,
3865  .createStmt = cquery->data,
3866  .dropStmt = "-- dummy"));
3867 
3868  /*
3869  * Dump per-blob comments and seclabels if any. We assume these are rare
3870  * enough that it's okay to generate retail TOC entries for them.
3871  */
3872  if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
3874  {
3875  for (int i = 0; i < loinfo->numlos; i++)
3876  {
3877  CatalogId catId;
3878  char namebuf[32];
3879 
3880  /* Build identifying info for this blob */
3881  catId.tableoid = loinfo->dobj.catId.tableoid;
3882  catId.oid = loinfo->looids[i];
3883  snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[i]);
3884 
3885  if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3886  dumpComment(fout, "LARGE OBJECT", namebuf,
3887  NULL, loinfo->rolname,
3888  catId, 0, loinfo->dobj.dumpId);
3889 
3890  if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3891  dumpSecLabel(fout, "LARGE OBJECT", namebuf,
3892  NULL, loinfo->rolname,
3893  catId, 0, loinfo->dobj.dumpId);
3894  }
3895  }
3896 
3897  /*
3898  * Dump the ACLs if any (remember that all blobs in the group will have
3899  * the same ACL). If there's just one blob, dump a simple ACL entry; if
3900  * there's more, make a "LARGE OBJECTS" entry that really contains only
3901  * the ACL for the first blob. _printTocEntry() will be cued by the tag
3902  * string to emit a mutated version for each blob.
3903  */
3904  if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3905  {
3906  char namebuf[32];
3907 
3908  /* Build identifying info for the first blob */
3909  snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[0]);
3910 
3911  if (loinfo->numlos > 1)
3912  {
3913  char tagbuf[64];
3914 
3915  snprintf(tagbuf, sizeof(tagbuf), "LARGE OBJECTS %u..%u",
3916  loinfo->looids[0], loinfo->looids[loinfo->numlos - 1]);
3917 
3918  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3919  "LARGE OBJECT", namebuf, NULL, NULL,
3920  tagbuf, loinfo->rolname, &loinfo->dacl);
3921  }
3922  else
3923  {
3924  dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3925  "LARGE OBJECT", namebuf, NULL, NULL,
3926  NULL, loinfo->rolname, &loinfo->dacl);
3927  }
3928  }
3929 
3930  destroyPQExpBuffer(cquery);
3931 }
3932 
3933 /*
3934  * dumpLOs:
3935  * dump the data contents of the large objects in the given group
3936  */
3937 static int
3938 dumpLOs(Archive *fout, const void *arg)
3939 {
3940  const LoInfo *loinfo = (const LoInfo *) arg;
3941  PGconn *conn = GetConnection(fout);
3942  char buf[LOBBUFSIZE];
3943 
3944  pg_log_info("saving large objects \"%s\"", loinfo->dobj.name);
3945 
3946  for (int i = 0; i < loinfo->numlos; i++)
3947  {
3948  Oid loOid = loinfo->looids[i];
3949  int loFd;
3950  int cnt;
3951 
3952  /* Open the LO */
3953  loFd = lo_open(conn, loOid, INV_READ);
3954  if (loFd == -1)
3955  pg_fatal("could not open large object %u: %s",
3956  loOid, PQerrorMessage(conn));
3957 
3958  StartLO(fout, loOid);
3959 
3960  /* Now read it in chunks, sending data to archive */
3961  do
3962  {
3963  cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3964  if (cnt < 0)
3965  pg_fatal("error reading large object %u: %s",
3966  loOid, PQerrorMessage(conn));
3967 
3968  WriteData(fout, buf, cnt);
3969  } while (cnt > 0);
3970 
3971  lo_close(conn, loFd);
3972 
3973  EndLO(fout, loOid);
3974  }
3975 
3976  return 1;
3977 }
3978 
3979 /*
3980  * getPolicies
3981  * get information about all RLS policies on dumpable tables.
3982  */
3983 void
3984 getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3985 {
3986  PQExpBuffer query;
3987  PQExpBuffer tbloids;
3988  PGresult *res;
3989  PolicyInfo *polinfo;
3990  int i_oid;
3991  int i_tableoid;
3992  int i_polrelid;
3993  int i_polname;
3994  int i_polcmd;
3995  int i_polpermissive;
3996  int i_polroles;
3997  int i_polqual;
3998  int i_polwithcheck;
3999  int i,
4000  j,
4001  ntups;
4002 
4003  /* No policies before 9.5 */
4004  if (fout->remoteVersion < 90500)
4005  return;
4006 
4007  query = createPQExpBuffer();
4008  tbloids = createPQExpBuffer();
4009 
4010  /*
4011  * Identify tables of interest, and check which ones have RLS enabled.
4012  */
4013  appendPQExpBufferChar(tbloids, '{');
4014  for (i = 0; i < numTables; i++)
4015  {
4016  TableInfo *tbinfo = &tblinfo[i];
4017 
4018  /* Ignore row security on tables not to be dumped */
4019  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
4020  continue;
4021 
4022  /* It can't have RLS or policies if it's not a table */
4023  if (tbinfo->relkind != RELKIND_RELATION &&
4024  tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
4025  continue;
4026 
4027  /* Add it to the list of table OIDs to be probed below */
4028  if (tbloids->len > 1) /* do we have more than the '{'? */
4029  appendPQExpBufferChar(tbloids, ',');
4030  appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
4031 
4032  /* Is RLS enabled? (That's separate from whether it has policies) */
4033  if (tbinfo->rowsec)
4034  {
4036 
4037  /*
4038  * We represent RLS being enabled on a table by creating a
4039  * PolicyInfo object with null polname.
4040  *
4041  * Note: use tableoid 0 so that this object won't be mistaken for
4042  * something that pg_depend entries apply to.
4043  */
4044  polinfo = pg_malloc(sizeof(PolicyInfo));
4045  polinfo->dobj.objType = DO_POLICY;
4046  polinfo->dobj.catId.tableoid = 0;
4047  polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
4048  AssignDumpId(&polinfo->dobj);
4049  polinfo->dobj.namespace = tbinfo->dobj.namespace;
4050  polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
4051  polinfo->poltable = tbinfo;
4052  polinfo->polname = NULL;
4053  polinfo->polcmd = '\0';
4054  polinfo->polpermissive = 0;
4055  polinfo->polroles = NULL;
4056  polinfo->polqual = NULL;
4057  polinfo->polwithcheck = NULL;
4058  }
4059  }
4060  appendPQExpBufferChar(tbloids, '}');
4061 
4062  /*
4063  * Now, read all RLS policies belonging to the tables of interest, and
4064  * create PolicyInfo objects for them. (Note that we must filter the
4065  * results server-side not locally, because we dare not apply pg_get_expr
4066  * to tables we don't have lock on.)
4067  */
4068  pg_log_info("reading row-level security policies");
4069 
4070  printfPQExpBuffer(query,
4071  "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
4072  if (fout->remoteVersion >= 100000)
4073  appendPQExpBufferStr(query, "pol.polpermissive, ");
4074  else
4075  appendPQExpBufferStr(query, "'t' as polpermissive, ");
4076  appendPQExpBuffer(query,
4077  "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
4078  " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
4079  "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
4080  "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
4081  "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
4082  "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
4083  tbloids->data);
4084 
4085  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4086 
4087  ntups = PQntuples(res);
4088  if (ntups > 0)
4089  {
4090  i_oid = PQfnumber(res, "oid");
4091  i_tableoid = PQfnumber(res, "tableoid");
4092  i_polrelid = PQfnumber(res, "polrelid");
4093  i_polname = PQfnumber(res, "polname");
4094  i_polcmd = PQfnumber(res, "polcmd");
4095  i_polpermissive = PQfnumber(res, "polpermissive");
4096  i_polroles = PQfnumber(res, "polroles");
4097  i_polqual = PQfnumber(res, "polqual");
4098  i_polwithcheck = PQfnumber(res, "polwithcheck");
4099 
4100  polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
4101 
4102  for (j = 0; j < ntups; j++)
4103  {
4104  Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
4105  TableInfo *tbinfo = findTableByOid(polrelid);
4106 
4108 
4109  polinfo[j].dobj.objType = DO_POLICY;
4110  polinfo[j].dobj.catId.tableoid =
4111  atooid(PQgetvalue(res, j, i_tableoid));
4112  polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
4113  AssignDumpId(&polinfo[j].dobj);
4114  polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4115  polinfo[j].poltable = tbinfo;
4116  polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
4117  polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
4118 
4119  polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
4120  polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
4121 
4122  if (PQgetisnull(res, j, i_polroles))
4123  polinfo[j].polroles = NULL;
4124  else
4125  polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
4126 
4127  if (PQgetisnull(res, j, i_polqual))
4128  polinfo[j].polqual = NULL;
4129  else
4130  polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
4131 
4132  if (PQgetisnull(res, j, i_polwithcheck))
4133  polinfo[j].polwithcheck = NULL;
4134  else
4135  polinfo[j].polwithcheck
4136  = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
4137  }
4138  }
4139 
4140  PQclear(res);
4141 
4142  destroyPQExpBuffer(query);
4143  destroyPQExpBuffer(tbloids);
4144 }
4145 
4146 /*
4147  * dumpPolicy
4148  * dump the definition of the given policy
4149  */
4150 static void
4151 dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
4152 {
4153  DumpOptions *dopt = fout->dopt;
4154  TableInfo *tbinfo = polinfo->poltable;
4155  PQExpBuffer query;
4156  PQExpBuffer delqry;
4157  PQExpBuffer polprefix;
4158  char *qtabname;
4159  const char *cmd;
4160  char *tag;
4161 
4162  /* Do nothing in data-only dump */
4163  if (dopt->dataOnly)
4164  return;
4165 
4166  /*
4167  * If polname is NULL, then this record is just indicating that ROW LEVEL
4168  * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
4169  * ROW LEVEL SECURITY.
4170  */
4171  if (polinfo->polname == NULL)
4172  {
4173  query = createPQExpBuffer();
4174 
4175  appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
4176  fmtQualifiedDumpable(tbinfo));
4177 
4178  /*
4179  * We must emit the ROW SECURITY object's dependency on its table
4180  * explicitly, because it will not match anything in pg_depend (unlike
4181  * the case for other PolicyInfo objects).
4182  */
4183  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4184  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4185  ARCHIVE_OPTS(.tag = polinfo->dobj.name,
4186  .namespace = polinfo->dobj.namespace->dobj.name,
4187  .owner = tbinfo->rolname,
4188  .description = "ROW SECURITY",
4189  .section = SECTION_POST_DATA,
4190  .createStmt = query->data,
4191  .deps = &(tbinfo->dobj.dumpId),
4192  .nDeps = 1));
4193 
4194  destroyPQExpBuffer(query);
4195  return;
4196  }
4197 
4198  if (polinfo->polcmd == '*')
4199  cmd = "";
4200  else if (polinfo->polcmd == 'r')
4201  cmd = " FOR SELECT";
4202  else if (polinfo->polcmd == 'a')
4203  cmd = " FOR INSERT";
4204  else if (polinfo->polcmd == 'w')
4205  cmd = " FOR UPDATE";
4206  else if (polinfo->polcmd == 'd')
4207  cmd = " FOR DELETE";
4208  else
4209  pg_fatal("unexpected policy command type: %c",
4210  polinfo->polcmd);
4211 
4212  query = createPQExpBuffer();
4213  delqry = createPQExpBuffer();
4214  polprefix = createPQExpBuffer();
4215 
4216  qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
4217 
4218  appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
4219 
4220  appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
4221  !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
4222 
4223  if (polinfo->polroles != NULL)
4224  appendPQExpBuffer(query, " TO %s", polinfo->polroles);
4225 
4226  if (polinfo->polqual != NULL)
4227  appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
4228 
4229  if (polinfo->polwithcheck != NULL)
4230  appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
4231 
4232  appendPQExpBufferStr(query, ";\n");
4233 
4234  appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
4235  appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
4236 
4237  appendPQExpBuffer(polprefix, "POLICY %s ON",
4238  fmtId(polinfo->polname));
4239 
4240  tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
4241 
4242  if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4243  ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4244  ARCHIVE_OPTS(.tag = tag,
4245  .namespace = polinfo->dobj.namespace->dobj.name,
4246  .owner = tbinfo->rolname,
4247  .description = "POLICY",
4248  .section = SECTION_POST_DATA,
4249  .createStmt = query->data,
4250  .dropStmt = delqry->data));
4251 
4252  if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4253  dumpComment(fout, polprefix->data, qtabname,
4254  tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
4255  polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
4256 
4257  free(tag);
4258  destroyPQExpBuffer(query);
4259  destroyPQExpBuffer(delqry);
4260  destroyPQExpBuffer(polprefix);
4261  free(qtabname);
4262 }
4263 
4264 /*
4265  * getPublications
4266  * get information about publications
4267  */
4268 void
4270 {
4271  DumpOptions *dopt = fout->dopt;
4272  PQExpBuffer query;
4273  PGresult *res;
4274  PublicationInfo *pubinfo;
4275  int i_tableoid;
4276  int i_oid;
4277  int i_pubname;
4278  int i_pubowner;
4279  int i_puballtables;
4280  int i_pubinsert;
4281  int i_pubupdate;
4282  int i_pubdelete;
4283  int i_pubtruncate;
4284  int i_pubviaroot;
4285  int i,
4286  ntups;
4287 
4288  if (dopt->no_publications || fout->remoteVersion < 100000)
4289  return;
4290 
4291  query = createPQExpBuffer();
4292 
4293  /* Get the publications. */
4294  if (fout->remoteVersion >= 130000)
4295  appendPQExpBufferStr(query,
4296  "SELECT p.tableoid, p.oid, p.pubname, "
4297  "p.pubowner, "
4298  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
4299  "FROM pg_publication p");
4300  else if (fout->remoteVersion >= 110000)
4301  appendPQExpBufferStr(query,
4302  "SELECT p.tableoid, p.oid, p.pubname, "
4303  "p.pubowner, "
4304  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
4305  "FROM pg_publication p");
4306  else
4307  appendPQExpBufferStr(query,
4308  "SELECT p.tableoid, p.oid, p.pubname, "
4309  "p.pubowner, "
4310  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
4311  "FROM pg_publication p");
4312 
4313  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4314 
4315  ntups = PQntuples(res);
4316 
4317  if (ntups == 0)
4318  goto cleanup;
4319 
4320  i_tableoid = PQfnumber(res, "tableoid");
4321  i_oid = PQfnumber(res, "oid");
4322  i_pubname = PQfnumber(res, "pubname");
4323  i_pubowner = PQfnumber(res, "pubowner");
4324  i_puballtables = PQfnumber(res, "puballtables");
4325  i_pubinsert = PQfnumber(res, "pubinsert");
4326  i_pubupdate = PQfnumber(res, "pubupdate");
4327  i_pubdelete = PQfnumber(res, "pubdelete");
4328  i_pubtruncate = PQfnumber(res, "pubtruncate");
4329  i_pubviaroot = PQfnumber(res, "pubviaroot");
4330 
4331  pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4332 
4333  for (i = 0; i < ntups; i++)
4334  {
4335  pubinfo[i].dobj.objType = DO_PUBLICATION;
4336  pubinfo[i].dobj.catId.tableoid =
4337  atooid(PQgetvalue(res, i, i_tableoid));
4338  pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4339  AssignDumpId(&pubinfo[i].dobj);
4340  pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4341  pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4342  pubinfo[i].puballtables =
4343  (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4344  pubinfo[i].pubinsert =
4345  (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4346  pubinfo[i].pubupdate =
4347  (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4348  pubinfo[i].pubdelete =
4349  (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4350  pubinfo[i].pubtruncate =
4351  (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4352  pubinfo[i].pubviaroot =
4353  (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4354 
4355  /* Decide whether we want to dump it */
4356  selectDumpableObject(&(pubinfo[i].dobj), fout);
4357  }
4358 
4359 cleanup:
4360  PQclear(res);
4361 
4362  destroyPQExpBuffer(query);
4363 }
4364 
4365 /*
4366  * dumpPublication
4367  * dump the definition of the given publication
4368  */
4369 static void
4371 {
4372  DumpOptions *dopt = fout->dopt;
4373  PQExpBuffer delq;
4374  PQExpBuffer query;
4375  char *qpubname;
4376  bool first = true;
4377 
4378  /* Do nothing in data-only dump */
4379  if (dopt->dataOnly)
4380  return;
4381 
4382  delq = createPQExpBuffer();
4383  query = createPQExpBuffer();
4384 
4385  qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4386 
4387  appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4388  qpubname);
4389 
4390  appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4391  qpubname);
4392 
4393  if (pubinfo->puballtables)
4394  appendPQExpBufferStr(query, " FOR ALL TABLES");
4395 
4396  appendPQExpBufferStr(query, " WITH (publish = '");
4397  if (pubinfo->pubinsert)
4398  {
4399  appendPQExpBufferStr(query, "insert");
4400  first = false;
4401  }
4402 
4403  if (pubinfo->pubupdate)
4404  {
4405  if (!first)
4406  appendPQExpBufferStr(query, ", ");
4407 
4408  appendPQExpBufferStr(query, "update");
4409  first = false;
4410  }
4411 
4412  if (pubinfo->pubdelete)
4413  {
4414  if (!first)
4415  appendPQExpBufferStr(query, ", ");
4416 
4417  appendPQExpBufferStr(query, "delete");
4418  first = false;
4419  }
4420 
4421  if (pubinfo->pubtruncate)
4422  {
4423  if (!first)
4424  appendPQExpBufferStr(query, ", ");
4425 
4426  appendPQExpBufferStr(query, "truncate");
4427  first = false;
4428  }
4429 
4430  appendPQExpBufferChar(query, '\'');
4431 
4432  if (pubinfo->pubviaroot)
4433  appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4434 
4435  appendPQExpBufferStr(query, ");\n");
4436 
4437  if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4438  ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4439  ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4440  .owner = pubinfo->rolname,
4441  .description = "PUBLICATION",
4442  .section = SECTION_POST_DATA,
4443  .createStmt = query->data,
4444  .dropStmt = delq->data));
4445 
4446  if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4447  dumpComment(fout, "PUBLICATION", qpubname,
4448  NULL, pubinfo->rolname,
4449  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4450 
4451  if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4452  dumpSecLabel(fout, "PUBLICATION", qpubname,
4453  NULL, pubinfo->rolname,
4454  pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4455 
4456  destroyPQExpBuffer(delq);
4457  destroyPQExpBuffer(query);
4458  free(qpubname);
4459 }
4460 
4461 /*
4462  * getPublicationNamespaces
4463  * get information about publication membership for dumpable schemas.
4464  */
4465 void
4467 {
4468  PQExpBuffer query;
4469  PGresult *res;
4470  PublicationSchemaInfo *pubsinfo;
4471  DumpOptions *dopt = fout->dopt;
4472  int i_tableoid;
4473  int i_oid;
4474  int i_pnpubid;
4475  int i_pnnspid;
4476  int i,
4477  j,
4478  ntups;
4479 
4480  if (dopt->no_publications || fout->remoteVersion < 150000)
4481  return;
4482 
4483  query = createPQExpBuffer();
4484 
4485  /* Collect all publication membership info. */
4486  appendPQExpBufferStr(query,
4487  "SELECT tableoid, oid, pnpubid, pnnspid "
4488  "FROM pg_catalog.pg_publication_namespace");
4489  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4490 
4491  ntups = PQntuples(res);
4492 
4493  i_tableoid = PQfnumber(res, "tableoid");
4494  i_oid = PQfnumber(res, "oid");
4495  i_pnpubid = PQfnumber(res, "pnpubid");
4496  i_pnnspid = PQfnumber(res, "pnnspid");
4497 
4498  /* this allocation may be more than we need */
4499  pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4500  j = 0;
4501 
4502  for (i = 0; i < ntups; i++)
4503  {
4504  Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4505  Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4506  PublicationInfo *pubinfo;
4507  NamespaceInfo *nspinfo;
4508 
4509  /*
4510  * Ignore any entries for which we aren't interested in either the
4511  * publication or the rel.
4512  */
4513  pubinfo = findPublicationByOid(pnpubid);
4514  if (pubinfo == NULL)
4515  continue;
4516  nspinfo = findNamespaceByOid(pnnspid);
4517  if (nspinfo == NULL)
4518  continue;
4519 
4520  /*
4521  * We always dump publication namespaces unless the corresponding
4522  * namespace is excluded from the dump.
4523  */
4524  if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4525  continue;
4526 
4527  /* OK, make a DumpableObject for this relationship */
4529  pubsinfo[j].dobj.catId.tableoid =
4530  atooid(PQgetvalue(res, i, i_tableoid));
4531  pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4532  AssignDumpId(&pubsinfo[j].dobj);
4533  pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4534  pubsinfo[j].dobj.name = nspinfo->dobj.name;
4535  pubsinfo[j].publication = pubinfo;
4536  pubsinfo[j].pubschema = nspinfo;
4537 
4538  /* Decide whether we want to dump it */
4539  selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4540 
4541  j++;
4542  }
4543 
4544  PQclear(res);
4545  destroyPQExpBuffer(query);
4546 }
4547 
4548 /*
4549  * getPublicationTables
4550  * get information about publication membership for dumpable tables.
4551  */
4552 void
4553 getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4554 {
4555  PQExpBuffer query;
4556  PGresult *res;
4557  PublicationRelInfo *pubrinfo;
4558  DumpOptions *dopt = fout->dopt;
4559  int i_tableoid;
4560  int i_oid;
4561  int i_prpubid;
4562  int i_prrelid;
4563  int i_prrelqual;
4564  int i_prattrs;
4565  int i,
4566  j,
4567  ntups;
4568 
4569  if (dopt->no_publications || fout->remoteVersion < 100000)
4570  return;
4571 
4572  query = createPQExpBuffer();
4573 
4574  /* Collect all publication membership info. */
4575  if (fout->remoteVersion >= 150000)
4576  appendPQExpBufferStr(query,
4577  "SELECT tableoid, oid, prpubid, prrelid, "
4578  "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4579  "(CASE\n"
4580  " WHEN pr.prattrs IS NOT NULL THEN\n"
4581  " (SELECT array_agg(attname)\n"
4582  " FROM\n"
4583  " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4584  " pg_catalog.pg_attribute\n"
4585  " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4586  " ELSE NULL END) prattrs "
4587  "FROM pg_catalog.pg_publication_rel pr");
4588  else
4589  appendPQExpBufferStr(query,
4590  "SELECT tableoid, oid, prpubid, prrelid, "
4591  "NULL AS prrelqual, NULL AS prattrs "
4592  "FROM pg_catalog.pg_publication_rel");
4593  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4594 
4595  ntups = PQntuples(res);
4596 
4597  i_tableoid = PQfnumber(res, "tableoid");
4598  i_oid = PQfnumber(res, "oid");
4599  i_prpubid = PQfnumber(res, "prpubid");
4600  i_prrelid = PQfnumber(res, "prrelid");
4601  i_prrelqual = PQfnumber(res, "prrelqual");
4602  i_prattrs = PQfnumber(res, "prattrs");
4603 
4604  /* this allocation may be more than we need */
4605  pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4606  j = 0;
4607 
4608  for (i = 0; i < ntups; i++)
4609  {
4610  Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
4611  Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
4612  PublicationInfo *pubinfo;
4613  TableInfo *tbinfo;
4614 
4615  /*
4616  * Ignore any entries for which we aren't interested in either the
4617  * publication or the rel.
4618  */
4619  pubinfo = findPublicationByOid(prpubid);
4620  if (pubinfo == NULL)
4621  continue;
4622  tbinfo = findTableByOid(prrelid);
4623  if (tbinfo == NULL)
4624  continue;
4625 
4626  /*
4627  * Ignore publication membership of tables whose definitions are not
4628  * to be dumped.
4629  */
4630  if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
4631  continue;
4632 
4633  /* OK, make a DumpableObject for this relationship */
4634  pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
4635  pubrinfo[j].dobj.catId.tableoid =
4636  atooid(PQgetvalue(res, i, i_tableoid));
4637  pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4638  AssignDumpId(&pubrinfo[j].dobj);
4639  pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4640  pubrinfo[j].dobj.name = tbinfo->dobj.name;
4641  pubrinfo[j].publication = pubinfo;
4642  pubrinfo[j].pubtable = tbinfo;
4643  if (PQgetisnull(res, i, i_prrelqual))
4644  pubrinfo[j].pubrelqual = NULL;
4645  else
4646  pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
4647 
4648  if (!PQgetisnull(res, i, i_prattrs))
4649  {
4650  char **attnames;
4651  int nattnames;
4652  PQExpBuffer attribs;
4653 
4654  if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
4655  &attnames, &nattnames))
4656  pg_fatal("could not parse %s array", "prattrs");
4657  attribs = createPQExpBuffer();
4658  for (int k = 0; k < nattnames; k++)
4659  {
4660  if (k > 0)
4661  appendPQExpBufferStr(attribs, ", ");
4662 
4663  appendPQExpBufferStr(attribs, fmtId(attnames[k]));
4664  }
4665  pubrinfo[j].pubrattrs = attribs->data;
4666  }
4667  else
4668  pubrinfo[j].pubrattrs = NULL;
4669 
4670  /* Decide whether we want to dump it */
4671  selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
4672 
4673  j++;
4674  }
4675 
4676  PQclear(res);
4677  destroyPQExpBuffer(query);
4678 }
4679 
4680 /*
4681  * dumpPublicationNamespace
4682  * dump the definition of the given publication schema mapping.
4683  */
4684 static void
4686 {
4687  DumpOptions *dopt = fout->dopt;
4688  NamespaceInfo *schemainfo = pubsinfo->pubschema;
4689  PublicationInfo *pubinfo = pubsinfo->publication;
4690  PQExpBuffer query;
4691  char *tag;
4692 
4693  /* Do nothing in data-only dump */
4694  if (dopt->dataOnly)
4695  return;
4696 
4697  tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name);
4698 
4699  query = createPQExpBuffer();
4700 
4701  appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name));
4702  appendPQExpBuffer(query, "ADD TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
4703 
4704  /*
4705  * There is no point in creating drop query as the drop is done by schema
4706  * drop.
4707  */
4708  if (pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4709  ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId,
4710  ARCHIVE_OPTS(.tag = tag,
4711  .namespace = schemainfo->dobj.name,
4712  .owner = pubinfo->rolname,
4713  .description = "PUBLICATION TABLES IN SCHEMA",
4714  .section = SECTION_POST_DATA,
4715  .createStmt = query->data));
4716 
4717  /* These objects can't currently have comments or seclabels */
4718 
4719  free(tag);
4720  destroyPQExpBuffer(query);
4721 }
4722 
4723 /*
4724  * dumpPublicationTable
4725  * dump the definition of the given publication table mapping
4726  */
4727 static void
4729 {
4730  DumpOptions *dopt = fout->dopt;
4731  PublicationInfo *pubinfo = pubrinfo->publication;
4732  TableInfo *tbinfo = pubrinfo->pubtable;
4733  PQExpBuffer query;
4734  char *tag;
4735 
4736  /* Do nothing in data-only dump */
4737  if (dopt->dataOnly)
4738  return;
4739 
4740  tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
4741 
4742  query = createPQExpBuffer();
4743 
4744  appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
4745  fmtId(pubinfo->dobj.name));
4746  appendPQExpBuffer(query, " %s",
4747  fmtQualifiedDumpable(tbinfo));
4748 
4749  if (pubrinfo->pubrattrs)
4750  appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
4751 
4752  if (pubrinfo->pubrelqual)
4753  {
4754  /*
4755  * It's necessary to add parentheses around the expression because
4756  * pg_get_expr won't supply the parentheses for things like WHERE
4757  * TRUE.
4758  */
4759  appendPQExpBuffer(query, " WHERE (%s)", pubrinfo->pubrelqual);
4760  }
4761  appendPQExpBufferStr(query, ";\n");
4762 
4763  /*
4764  * There is no point in creating a drop query as the drop is done by table
4765  * drop. (If you think to change this, see also _printTocEntry().)
4766  * Although this object doesn't really have ownership as such, set the
4767  * owner field anyway to ensure that the command is run by the correct
4768  * role at restore time.
4769  */
4770  if (pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4771  ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
4772  ARCHIVE_OPTS(.tag = tag,
4773  .namespace = tbinfo->dobj.namespace->dobj.name,
4774  .owner = pubinfo->rolname,
4775  .description = "PUBLICATION TABLE",
4776  .section = SECTION_POST_DATA,
4777  .createStmt = query->data));
4778 
4779  /* These objects can't currently have comments or seclabels */
4780 
4781  free(tag);
4782  destroyPQExpBuffer(query);
4783 }
4784 
4785 /*
4786  * Is the currently connected user a superuser?
4787  */
4788 static bool
4790 {
4791  ArchiveHandle *AH = (ArchiveHandle *) fout;
4792  const char *val;
4793 
4794  val = PQparameterStatus(AH->connection, "is_superuser");
4795 
4796  if (val && strcmp(val, "on") == 0)
4797  return true;
4798 
4799  return false;
4800 }
4801 
4802 /*
4803  * Set the given value to restrict_nonsystem_relation_kind value. Since
4804  * restrict_nonsystem_relation_kind is introduced in minor version releases,
4805  * the setting query is effective only where available.
4806  */
4807 static void
4809 {
4810  PQExpBuffer query = createPQExpBuffer();
4811  PGresult *res;
4812 
4813  appendPQExpBuffer(query,
4814  "SELECT set_config(name, '%s', false) "
4815  "FROM pg_settings "
4816  "WHERE name = 'restrict_nonsystem_relation_kind'",
4817  value);
4818  res = ExecuteSqlQuery(AH, query->data, PGRES_TUPLES_OK);
4819 
4820  PQclear(res);
4821  destroyPQExpBuffer(query);
4822 }
4823 
4824 /*
4825  * getSubscriptions
4826  * get information about subscriptions
4827  */
4828 void
4830 {
4831  DumpOptions *dopt = fout->dopt;
4832  PQExpBuffer query;
4833  PGresult *res;
4834  SubscriptionInfo *subinfo;
4835  int i_tableoid;
4836  int i_oid;
4837  int i_subname;
4838  int i_subowner;
4839  int i_subbinary;
4840  int i_substream;
4841  int i_subtwophasestate;
4842  int i_subdisableonerr;
4843  int i_subpasswordrequired;
4844  int i_subrunasowner;
4845  int i_subconninfo;
4846  int i_subslotname;
4847  int i_subsynccommit;
4848  int i_subpublications;
4849  int i_suborigin;
4850  int i_suboriginremotelsn;
4851  int i_subenabled;
4852  int i_subfailover;
4853  int i,
4854  ntups;
4855 
4856  if (dopt->no_subscriptions || fout->remoteVersion < 100000)
4857  return;
4858 
4859  if (!is_superuser(fout))
4860  {
4861  int n;
4862 
4863  res = ExecuteSqlQuery(fout,
4864  "SELECT count(*) FROM pg_subscription "
4865  "WHERE subdbid = (SELECT oid FROM pg_database"
4866  " WHERE datname = current_database())",
4867  PGRES_TUPLES_OK);
4868  n = atoi(PQgetvalue(res, 0, 0));
4869  if (n > 0)
4870  pg_log_warning("subscriptions not dumped because current user is not a superuser");
4871  PQclear(res);
4872  return;
4873  }
4874 
4875  query = createPQExpBuffer();
4876 
4877  /* Get the subscriptions in current database. */
4878  appendPQExpBufferStr(query,
4879  "SELECT s.tableoid, s.oid, s.subname,\n"
4880  " s.subowner,\n"
4881  " s.subconninfo, s.subslotname, s.subsynccommit,\n"
4882  " s.subpublications,\n");
4883 
4884  if (fout->remoteVersion >= 140000)
4885  appendPQExpBufferStr(query, " s.subbinary,\n");
4886  else
4887  appendPQExpBufferStr(query, " false AS subbinary,\n");
4888 
4889  if (fout->remoteVersion >= 140000)
4890  appendPQExpBufferStr(query, " s.substream,\n");
4891  else
4892  appendPQExpBufferStr(query, " 'f' AS substream,\n");
4893 
4894  if (fout->remoteVersion >= 150000)
4895  appendPQExpBufferStr(query,
4896  " s.subtwophasestate,\n"
4897  " s.subdisableonerr,\n");
4898  else
4899  appendPQExpBuffer(query,
4900  " '%c' AS subtwophasestate,\n"
4901  " false AS subdisableonerr,\n",
4903 
4904  if (fout->remoteVersion >= 160000)
4905  appendPQExpBufferStr(query,
4906  " s.subpasswordrequired,\n"
4907  " s.subrunasowner,\n"
4908  " s.suborigin,\n");
4909  else
4910  appendPQExpBuffer(query,
4911  " 't' AS subpasswordrequired,\n"
4912  " 't' AS subrunasowner,\n"
4913  " '%s' AS suborigin,\n",
4915 
4916  if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
4917  appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
4918  " s.subenabled,\n");
4919  else
4920  appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
4921  " false AS subenabled,\n");
4922 
4923  if (fout->remoteVersion >= 170000)
4924  appendPQExpBufferStr(query,
4925  " s.subfailover\n");
4926  else
4927  appendPQExpBuffer(query,
4928  " false AS subfailover\n");
4929 
4930  appendPQExpBufferStr(query,
4931  "FROM pg_subscription s\n");
4932 
4933  if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
4934  appendPQExpBufferStr(query,
4935  "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
4936  " ON o.external_id = 'pg_' || s.oid::text \n");
4937 
4938  appendPQExpBufferStr(query,
4939  "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
4940  " WHERE datname = current_database())");
4941 
4942  res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4943 
4944  ntups = PQntuples(res);
4945 
4946  /*
4947  * Get subscription fields. We don't include subskiplsn in the dump as
4948  * after restoring the dump this value may no longer be relevant.
4949  */
4950  i_tableoid = PQfnumber(res, "tableoid");
4951  i_oid = PQfnumber(res, "oid");
4952  i_subname = PQfnumber(res, "subname");
4953  i_subowner = PQfnumber(res, "subowner");
4954  i_subbinary = PQfnumber(res, "subbinary");
4955  i_substream = PQfnumber(res, "substream");
4956  i_subtwophasestate = PQfnumber(res, "subtwophasestate");
4957  i_subdisableonerr = PQfnumber(res, "subdisableonerr");
4958  i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
4959  i_subrunasowner = PQfnumber(res, "subrunasowner");
4960  i_subconninfo = PQfnumber(res, "subconninfo");
4961  i_subslotname = PQfnumber(res, "subslotname");
4962  i_subsynccommit = PQfnumber(res, "subsynccommit");
4963  i_subpublications = PQfnumber(res, "subpublications");
4964  i_suborigin = PQfnumber(res, "suborigin");
4965  i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
4966  i_subenabled = PQfnumber(res, "subenabled");
4967  i_subfailover = PQfnumber(res, "subfailover");
4968 
4969  subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
4970 
4971  for (i = 0; i < ntups; i++)
4972  {
4973  subinfo[i].dobj.objType = DO_SUBSCRIPTION;
4974  subinfo[i].dobj.catId.tableoid =
4975  atooid(PQgetvalue(res, i, i_tableoid));
4976  subinfo[i].dobj.catId.oid =