PostgreSQL Source Code git master
pg_dump.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_dump.c
4 * pg_dump is a utility for dumping out a postgres database
5 * into a script file.
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * pg_dump will read the system catalogs in a database and dump out a
11 * script that reproduces the schema in terms of SQL that is understood
12 * by PostgreSQL
13 *
14 * Note that pg_dump runs in a transaction-snapshot mode transaction,
15 * so it sees a consistent snapshot of the database including system
16 * catalogs. However, it relies in part on various specialized backend
17 * functions like pg_get_indexdef(), and those things tend to look at
18 * the currently committed state. So it is possible to get 'cache
19 * lookup failed' error if someone performs DDL changes while a dump is
20 * happening. The window for this sort of thing is from the acquisition
21 * of the transaction snapshot to getSchemaData() (when pg_dump acquires
22 * AccessShareLock on every table it intends to dump). It isn't very large,
23 * but it can happen.
24 *
25 * http://archives.postgresql.org/pgsql-bugs/2010-02/msg00187.php
26 *
27 * IDENTIFICATION
28 * src/bin/pg_dump/pg_dump.c
29 *
30 *-------------------------------------------------------------------------
31 */
32#include "postgres_fe.h"
33
34#include <unistd.h>
35#include <ctype.h>
36#include <limits.h>
37#ifdef HAVE_TERMIOS_H
38#include <termios.h>
39#endif
40
41#include "access/attnum.h"
42#include "access/sysattr.h"
43#include "access/transam.h"
44#include "catalog/pg_aggregate_d.h"
45#include "catalog/pg_am_d.h"
46#include "catalog/pg_attribute_d.h"
47#include "catalog/pg_authid_d.h"
48#include "catalog/pg_cast_d.h"
49#include "catalog/pg_class_d.h"
50#include "catalog/pg_default_acl_d.h"
51#include "catalog/pg_largeobject_d.h"
52#include "catalog/pg_proc_d.h"
53#include "catalog/pg_publication_d.h"
54#include "catalog/pg_subscription_d.h"
55#include "catalog/pg_type_d.h"
56#include "common/connect.h"
57#include "common/int.h"
58#include "common/relpath.h"
59#include "compress_io.h"
60#include "dumputils.h"
63#include "filter.h"
64#include "getopt_long.h"
65#include "libpq/libpq-fs.h"
66#include "parallel.h"
67#include "pg_backup_db.h"
68#include "pg_backup_utils.h"
69#include "pg_dump.h"
70#include "storage/block.h"
71
72typedef struct
73{
74 Oid roleoid; /* role's OID */
75 const char *rolename; /* role's name */
77
78typedef struct
79{
80 const char *descr; /* comment for an object */
81 Oid classoid; /* object class (catalog OID) */
82 Oid objoid; /* object OID */
83 int objsubid; /* subobject (table column #) */
85
86typedef struct
87{
88 const char *provider; /* label provider of this security label */
89 const char *label; /* security label for an object */
90 Oid classoid; /* object class (catalog OID) */
91 Oid objoid; /* object OID */
92 int objsubid; /* subobject (table column #) */
94
95typedef struct
96{
97 Oid oid; /* object OID */
98 char relkind; /* object kind */
99 RelFileNumber relfilenumber; /* object filenode */
100 Oid toast_oid; /* toast table OID */
101 RelFileNumber toast_relfilenumber; /* toast table filenode */
102 Oid toast_index_oid; /* toast table index OID */
103 RelFileNumber toast_index_relfilenumber; /* toast table index filenode */
105
106/* sequence types */
107typedef enum SeqType
108{
113
114static const char *const SeqTypeNames[] =
115{
116 [SEQTYPE_SMALLINT] = "smallint",
117 [SEQTYPE_INTEGER] = "integer",
118 [SEQTYPE_BIGINT] = "bigint",
119};
120
122 "array length mismatch");
123
124typedef struct
125{
126 Oid oid; /* sequence OID */
127 SeqType seqtype; /* data type of sequence */
128 bool cycled; /* whether sequence cycles */
129 int64 minv; /* minimum value */
130 int64 maxv; /* maximum value */
131 int64 startv; /* start value */
132 int64 incby; /* increment value */
133 int64 cache; /* cache size */
134 int64 last_value; /* last value of sequence */
135 bool is_called; /* whether nextval advances before returning */
137
138typedef enum OidOptions
139{
144
145/* global decls */
146static bool dosync = true; /* Issue fsync() to make dump durable on disk. */
147
148static Oid g_last_builtin_oid; /* value of the last builtin oid */
149
150/* The specified names/patterns should to match at least one entity */
151static int strict_names = 0;
152
154
155/*
156 * Object inclusion/exclusion lists
157 *
158 * The string lists record the patterns given by command-line switches,
159 * which we then convert to lists of OIDs of matching objects.
160 */
162static SimpleOidList schema_include_oids = {NULL, NULL};
164static SimpleOidList schema_exclude_oids = {NULL, NULL};
165
168static SimpleOidList table_include_oids = {NULL, NULL};
171static SimpleOidList table_exclude_oids = {NULL, NULL};
175
178
181
184
185static const CatalogId nilCatalogId = {0, 0};
186
187/* override for standard extra_float_digits setting */
188static bool have_extra_float_digits = false;
190
191/* sorted table of role names */
192static RoleNameItem *rolenames = NULL;
193static int nrolenames = 0;
194
195/* sorted table of comments */
196static CommentItem *comments = NULL;
197static int ncomments = 0;
198
199/* sorted table of security labels */
200static SecLabelItem *seclabels = NULL;
201static int nseclabels = 0;
202
203/* sorted table of pg_class information for binary upgrade */
206
207/* sorted table of sequences */
208static SequenceItem *sequences = NULL;
209static int nsequences = 0;
210
211/*
212 * The default number of rows per INSERT when
213 * --inserts is specified without --rows-per-insert
214 */
215#define DUMP_DEFAULT_ROWS_PER_INSERT 1
216
217/*
218 * Maximum number of large objects to group into a single ArchiveEntry.
219 * At some point we might want to make this user-controllable, but for now
220 * a hard-wired setting will suffice.
221 */
222#define MAX_BLOBS_PER_ARCHIVE_ENTRY 1000
223
224/*
225 * Macro for producing quoted, schema-qualified name of a dumpable object.
226 */
227#define fmtQualifiedDumpable(obj) \
228 fmtQualifiedId((obj)->dobj.namespace->dobj.name, \
229 (obj)->dobj.name)
230
231static void help(const char *progname);
232static void setup_connection(Archive *AH,
233 const char *dumpencoding, const char *dumpsnapshot,
234 char *use_role);
236static void expand_schema_name_patterns(Archive *fout,
237 SimpleStringList *patterns,
238 SimpleOidList *oids,
239 bool strict_names);
241 SimpleStringList *patterns,
242 SimpleOidList *oids,
243 bool strict_names);
245 SimpleStringList *patterns,
246 SimpleOidList *oids);
247static void expand_table_name_patterns(Archive *fout,
248 SimpleStringList *patterns,
249 SimpleOidList *oids,
250 bool strict_names,
251 bool with_child_tables);
252static void prohibit_crossdb_refs(PGconn *conn, const char *dbname,
253 const char *pattern);
254
255static NamespaceInfo *findNamespace(Oid nsoid);
256static void dumpTableData(Archive *fout, const TableDataInfo *tdinfo);
257static void refreshMatViewData(Archive *fout, const TableDataInfo *tdinfo);
258static const char *getRoleName(const char *roleoid_str);
259static void collectRoleNames(Archive *fout);
260static void getAdditionalACLs(Archive *fout);
261static void dumpCommentExtended(Archive *fout, const char *type,
262 const char *name, const char *namespace,
263 const char *owner, CatalogId catalogId,
264 int subid, DumpId dumpId,
265 const char *initdb_comment);
266static inline void dumpComment(Archive *fout, const char *type,
267 const char *name, const char *namespace,
268 const char *owner, CatalogId catalogId,
269 int subid, DumpId dumpId);
270static int findComments(Oid classoid, Oid objoid, CommentItem **items);
271static void collectComments(Archive *fout);
272static void dumpSecLabel(Archive *fout, const char *type, const char *name,
273 const char *namespace, const char *owner,
274 CatalogId catalogId, int subid, DumpId dumpId);
275static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
276static void collectSecLabels(Archive *fout);
277static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
278static void dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo);
279static void dumpExtension(Archive *fout, const ExtensionInfo *extinfo);
280static void dumpType(Archive *fout, const TypeInfo *tyinfo);
281static void dumpBaseType(Archive *fout, const TypeInfo *tyinfo);
282static void dumpEnumType(Archive *fout, const TypeInfo *tyinfo);
283static void dumpRangeType(Archive *fout, const TypeInfo *tyinfo);
284static void dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo);
285static void dumpDomain(Archive *fout, const TypeInfo *tyinfo);
286static void dumpCompositeType(Archive *fout, const TypeInfo *tyinfo);
287static void dumpCompositeTypeColComments(Archive *fout, const TypeInfo *tyinfo,
288 PGresult *res);
289static void dumpShellType(Archive *fout, const ShellTypeInfo *stinfo);
290static void dumpProcLang(Archive *fout, const ProcLangInfo *plang);
291static void dumpFunc(Archive *fout, const FuncInfo *finfo);
292static void dumpCast(Archive *fout, const CastInfo *cast);
293static void dumpTransform(Archive *fout, const TransformInfo *transform);
294static void dumpOpr(Archive *fout, const OprInfo *oprinfo);
295static void dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo);
296static void dumpOpclass(Archive *fout, const OpclassInfo *opcinfo);
297static void dumpOpfamily(Archive *fout, const OpfamilyInfo *opfinfo);
298static void dumpCollation(Archive *fout, const CollInfo *collinfo);
299static void dumpConversion(Archive *fout, const ConvInfo *convinfo);
300static void dumpRule(Archive *fout, const RuleInfo *rinfo);
301static void dumpAgg(Archive *fout, const AggInfo *agginfo);
302static void dumpTrigger(Archive *fout, const TriggerInfo *tginfo);
303static void dumpEventTrigger(Archive *fout, const EventTriggerInfo *evtinfo);
304static void dumpTable(Archive *fout, const TableInfo *tbinfo);
305static void dumpTableSchema(Archive *fout, const TableInfo *tbinfo);
306static void dumpTableAttach(Archive *fout, const TableAttachInfo *attachinfo);
307static void dumpAttrDef(Archive *fout, const AttrDefInfo *adinfo);
308static void collectSequences(Archive *fout);
309static void dumpSequence(Archive *fout, const TableInfo *tbinfo);
310static void dumpSequenceData(Archive *fout, const TableDataInfo *tdinfo);
311static void dumpIndex(Archive *fout, const IndxInfo *indxinfo);
312static void dumpIndexAttach(Archive *fout, const IndexAttachInfo *attachinfo);
313static void dumpStatisticsExt(Archive *fout, const StatsExtInfo *statsextinfo);
314static void dumpConstraint(Archive *fout, const ConstraintInfo *coninfo);
315static void dumpTableConstraintComment(Archive *fout, const ConstraintInfo *coninfo);
316static void dumpTSParser(Archive *fout, const TSParserInfo *prsinfo);
317static void dumpTSDictionary(Archive *fout, const TSDictInfo *dictinfo);
318static void dumpTSTemplate(Archive *fout, const TSTemplateInfo *tmplinfo);
319static void dumpTSConfig(Archive *fout, const TSConfigInfo *cfginfo);
320static void dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo);
321static void dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo);
322static void dumpUserMappings(Archive *fout,
323 const char *servername, const char *namespace,
324 const char *owner, CatalogId catalogId, DumpId dumpId);
325static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
326
327static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
328 const char *type, const char *name, const char *subname,
329 const char *nspname, const char *tag, const char *owner,
330 const DumpableAcl *dacl);
331
332static void getDependencies(Archive *fout);
333static void BuildArchiveDependencies(Archive *fout);
334static void findDumpableDependencies(ArchiveHandle *AH, const DumpableObject *dobj,
335 DumpId **dependencies, int *nDeps, int *allocDeps);
336
338static void addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
339 DumpableObject *boundaryObjs);
340
341static void addConstrChildIdxDeps(DumpableObject *dobj, const IndxInfo *refidx);
342static void getDomainConstraints(Archive *fout, TypeInfo *tyinfo);
343static void getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind);
344static void makeTableDataInfo(DumpOptions *dopt, TableInfo *tbinfo);
346static void getTableDataFKConstraints(void);
347static void determineNotNullFlags(Archive *fout, PGresult *res, int r,
348 TableInfo *tbinfo, int j,
349 int i_notnull_name, int i_notnull_noinherit,
350 int i_notnull_islocal);
351static char *format_function_arguments(const FuncInfo *finfo, const char *funcargs,
352 bool is_agg);
353static char *format_function_signature(Archive *fout,
354 const FuncInfo *finfo, bool honor_quotes);
355static char *convertRegProcReference(const char *proc);
356static char *getFormattedOperatorName(const char *oproid);
357static char *convertTSFunction(Archive *fout, Oid funcOid);
358static const char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
359static void getLOs(Archive *fout);
360static void dumpLO(Archive *fout, const LoInfo *loinfo);
361static int dumpLOs(Archive *fout, const void *arg);
362static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
363static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
364static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
365static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
366static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
367static void dumpDatabase(Archive *fout);
368static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
369 const char *dbname, Oid dboid);
370static void dumpEncoding(Archive *AH);
371static void dumpStdStrings(Archive *AH);
372static void dumpSearchPath(Archive *AH);
374 PQExpBuffer upgrade_buffer,
375 Oid pg_type_oid,
376 bool force_array_type,
377 bool include_multirange_type);
379 PQExpBuffer upgrade_buffer,
380 const TableInfo *tbinfo);
381static void collectBinaryUpgradeClassOids(Archive *fout);
383 PQExpBuffer upgrade_buffer,
384 Oid pg_class_oid);
385static void binary_upgrade_extension_member(PQExpBuffer upgrade_buffer,
386 const DumpableObject *dobj,
387 const char *objtype,
388 const char *objname,
389 const char *objnamespace);
390static const char *getAttrName(int attrnum, const TableInfo *tblInfo);
391static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
392static bool nonemptyReloptions(const char *reloptions);
393static void appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
394 const char *prefix, Archive *fout);
395static char *get_synchronized_snapshot(Archive *fout);
396static void set_restrict_relation_kind(Archive *AH, const char *value);
397static void setupDumpWorker(Archive *AH);
398static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
399static bool forcePartitionRootLoad(const TableInfo *tbinfo);
400static void read_dump_filters(const char *filename, DumpOptions *dopt);
401
402
403int
404main(int argc, char **argv)
405{
406 int c;
407 const char *filename = NULL;
408 const char *format = "p";
409 TableInfo *tblinfo;
410 int numTables;
411 DumpableObject **dobjs;
412 int numObjs;
413 DumpableObject *boundaryObjs;
414 int i;
415 int optindex;
416 RestoreOptions *ropt;
417 Archive *fout; /* the script file */
418 bool g_verbose = false;
419 const char *dumpencoding = NULL;
420 const char *dumpsnapshot = NULL;
421 char *use_role = NULL;
422 int numWorkers = 1;
423 int plainText = 0;
424 ArchiveFormat archiveFormat = archUnknown;
425 ArchiveMode archiveMode;
426 pg_compress_specification compression_spec = {0};
427 char *compression_detail = NULL;
428 char *compression_algorithm_str = "none";
429 char *error_detail = NULL;
430 bool user_compression_defined = false;
432 bool data_only = false;
433 bool schema_only = false;
434
435 static DumpOptions dopt;
436
437 static struct option long_options[] = {
438 {"data-only", no_argument, NULL, 'a'},
439 {"blobs", no_argument, NULL, 'b'},
440 {"large-objects", no_argument, NULL, 'b'},
441 {"no-blobs", no_argument, NULL, 'B'},
442 {"no-large-objects", no_argument, NULL, 'B'},
443 {"clean", no_argument, NULL, 'c'},
444 {"create", no_argument, NULL, 'C'},
445 {"dbname", required_argument, NULL, 'd'},
446 {"extension", required_argument, NULL, 'e'},
447 {"file", required_argument, NULL, 'f'},
448 {"format", required_argument, NULL, 'F'},
449 {"host", required_argument, NULL, 'h'},
450 {"jobs", 1, NULL, 'j'},
451 {"no-reconnect", no_argument, NULL, 'R'},
452 {"no-owner", no_argument, NULL, 'O'},
453 {"port", required_argument, NULL, 'p'},
454 {"schema", required_argument, NULL, 'n'},
455 {"exclude-schema", required_argument, NULL, 'N'},
456 {"schema-only", no_argument, NULL, 's'},
457 {"superuser", required_argument, NULL, 'S'},
458 {"table", required_argument, NULL, 't'},
459 {"exclude-table", required_argument, NULL, 'T'},
460 {"no-password", no_argument, NULL, 'w'},
461 {"password", no_argument, NULL, 'W'},
462 {"username", required_argument, NULL, 'U'},
463 {"verbose", no_argument, NULL, 'v'},
464 {"no-privileges", no_argument, NULL, 'x'},
465 {"no-acl", no_argument, NULL, 'x'},
466 {"compress", required_argument, NULL, 'Z'},
467 {"encoding", required_argument, NULL, 'E'},
468 {"help", no_argument, NULL, '?'},
469 {"version", no_argument, NULL, 'V'},
470
471 /*
472 * the following options don't have an equivalent short option letter
473 */
474 {"attribute-inserts", no_argument, &dopt.column_inserts, 1},
475 {"binary-upgrade", no_argument, &dopt.binary_upgrade, 1},
476 {"column-inserts", no_argument, &dopt.column_inserts, 1},
477 {"disable-dollar-quoting", no_argument, &dopt.disable_dollar_quoting, 1},
478 {"disable-triggers", no_argument, &dopt.disable_triggers, 1},
479 {"enable-row-security", no_argument, &dopt.enable_row_security, 1},
480 {"exclude-table-data", required_argument, NULL, 4},
481 {"extra-float-digits", required_argument, NULL, 8},
482 {"if-exists", no_argument, &dopt.if_exists, 1},
483 {"inserts", no_argument, NULL, 9},
484 {"lock-wait-timeout", required_argument, NULL, 2},
485 {"no-table-access-method", no_argument, &dopt.outputNoTableAm, 1},
486 {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
487 {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
488 {"load-via-partition-root", no_argument, &dopt.load_via_partition_root, 1},
489 {"role", required_argument, NULL, 3},
490 {"section", required_argument, NULL, 5},
491 {"serializable-deferrable", no_argument, &dopt.serializable_deferrable, 1},
492 {"snapshot", required_argument, NULL, 6},
493 {"strict-names", no_argument, &strict_names, 1},
494 {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
495 {"no-comments", no_argument, &dopt.no_comments, 1},
496 {"no-publications", no_argument, &dopt.no_publications, 1},
497 {"no-security-labels", no_argument, &dopt.no_security_labels, 1},
498 {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
499 {"no-toast-compression", no_argument, &dopt.no_toast_compression, 1},
500 {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
501 {"no-sync", no_argument, NULL, 7},
502 {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
503 {"rows-per-insert", required_argument, NULL, 10},
504 {"include-foreign-data", required_argument, NULL, 11},
505 {"table-and-children", required_argument, NULL, 12},
506 {"exclude-table-and-children", required_argument, NULL, 13},
507 {"exclude-table-data-and-children", required_argument, NULL, 14},
508 {"sync-method", required_argument, NULL, 15},
509 {"filter", required_argument, NULL, 16},
510 {"exclude-extension", required_argument, NULL, 17},
511
512 {NULL, 0, NULL, 0}
513 };
514
515 pg_logging_init(argv[0]);
517 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
518
519 /*
520 * Initialize what we need for parallel execution, especially for thread
521 * support on Windows.
522 */
524
525 progname = get_progname(argv[0]);
526
527 if (argc > 1)
528 {
529 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
530 {
531 help(progname);
532 exit_nicely(0);
533 }
534 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
535 {
536 puts("pg_dump (PostgreSQL) " PG_VERSION);
537 exit_nicely(0);
538 }
539 }
540
541 InitDumpOptions(&dopt);
542
543 while ((c = getopt_long(argc, argv, "abBcCd:e:E:f:F:h:j:n:N:Op:RsS:t:T:U:vwWxZ:",
544 long_options, &optindex)) != -1)
545 {
546 switch (c)
547 {
548 case 'a': /* Dump data only */
549 data_only = true;
550 break;
551
552 case 'b': /* Dump LOs */
553 dopt.outputLOs = true;
554 break;
555
556 case 'B': /* Don't dump LOs */
557 dopt.dontOutputLOs = true;
558 break;
559
560 case 'c': /* clean (i.e., drop) schema prior to create */
561 dopt.outputClean = 1;
562 break;
563
564 case 'C': /* Create DB */
565 dopt.outputCreateDB = 1;
566 break;
567
568 case 'd': /* database name */
570 break;
571
572 case 'e': /* include extension(s) */
574 dopt.include_everything = false;
575 break;
576
577 case 'E': /* Dump encoding */
578 dumpencoding = pg_strdup(optarg);
579 break;
580
581 case 'f':
583 break;
584
585 case 'F':
587 break;
588
589 case 'h': /* server host */
591 break;
592
593 case 'j': /* number of dump jobs */
594 if (!option_parse_int(optarg, "-j/--jobs", 1,
596 &numWorkers))
597 exit_nicely(1);
598 break;
599
600 case 'n': /* include schema(s) */
602 dopt.include_everything = false;
603 break;
604
605 case 'N': /* exclude schema(s) */
607 break;
608
609 case 'O': /* Don't reconnect to match owner */
610 dopt.outputNoOwner = 1;
611 break;
612
613 case 'p': /* server port */
615 break;
616
617 case 'R':
618 /* no-op, still accepted for backwards compatibility */
619 break;
620
621 case 's': /* dump schema only */
622 schema_only = true;
623 break;
624
625 case 'S': /* Username for superuser in plain text output */
627 break;
628
629 case 't': /* include table(s) */
631 dopt.include_everything = false;
632 break;
633
634 case 'T': /* exclude table(s) */
636 break;
637
638 case 'U':
640 break;
641
642 case 'v': /* verbose */
643 g_verbose = true;
645 break;
646
647 case 'w':
649 break;
650
651 case 'W':
653 break;
654
655 case 'x': /* skip ACL dump */
656 dopt.aclsSkip = true;
657 break;
658
659 case 'Z': /* Compression */
660 parse_compress_options(optarg, &compression_algorithm_str,
661 &compression_detail);
662 user_compression_defined = true;
663 break;
664
665 case 0:
666 /* This covers the long options. */
667 break;
668
669 case 2: /* lock-wait-timeout */
671 break;
672
673 case 3: /* SET ROLE */
674 use_role = pg_strdup(optarg);
675 break;
676
677 case 4: /* exclude table(s) data */
679 break;
680
681 case 5: /* section */
683 break;
684
685 case 6: /* snapshot */
686 dumpsnapshot = pg_strdup(optarg);
687 break;
688
689 case 7: /* no-sync */
690 dosync = false;
691 break;
692
693 case 8:
695 if (!option_parse_int(optarg, "--extra-float-digits", -15, 3,
697 exit_nicely(1);
698 break;
699
700 case 9: /* inserts */
701
702 /*
703 * dump_inserts also stores --rows-per-insert, careful not to
704 * overwrite that.
705 */
706 if (dopt.dump_inserts == 0)
708 break;
709
710 case 10: /* rows per insert */
711 if (!option_parse_int(optarg, "--rows-per-insert", 1, INT_MAX,
712 &dopt.dump_inserts))
713 exit_nicely(1);
714 break;
715
716 case 11: /* include foreign data */
718 optarg);
719 break;
720
721 case 12: /* include table(s) and their children */
723 optarg);
724 dopt.include_everything = false;
725 break;
726
727 case 13: /* exclude table(s) and their children */
729 optarg);
730 break;
731
732 case 14: /* exclude data of table(s) and children */
734 optarg);
735 break;
736
737 case 15:
739 exit_nicely(1);
740 break;
741
742 case 16: /* read object filters from file */
744 break;
745
746 case 17: /* exclude extension(s) */
748 optarg);
749 break;
750
751 default:
752 /* getopt_long already emitted a complaint */
753 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
754 exit_nicely(1);
755 }
756 }
757
758 /*
759 * Non-option argument specifies database name as long as it wasn't
760 * already specified with -d / --dbname
761 */
762 if (optind < argc && dopt.cparams.dbname == NULL)
763 dopt.cparams.dbname = argv[optind++];
764
765 /* Complain if any arguments remain */
766 if (optind < argc)
767 {
768 pg_log_error("too many command-line arguments (first is \"%s\")",
769 argv[optind]);
770 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
771 exit_nicely(1);
772 }
773
774 /* --column-inserts implies --inserts */
775 if (dopt.column_inserts && dopt.dump_inserts == 0)
777
778 /*
779 * Binary upgrade mode implies dumping sequence data even in schema-only
780 * mode. This is not exposed as a separate option, but kept separate
781 * internally for clarity.
782 */
783 if (dopt.binary_upgrade)
784 dopt.sequence_data = 1;
785
786 if (data_only && schema_only)
787 pg_fatal("options -s/--schema-only and -a/--data-only cannot be used together");
788
789 if (schema_only && foreign_servers_include_patterns.head != NULL)
790 pg_fatal("options -s/--schema-only and --include-foreign-data cannot be used together");
791
792 if (numWorkers > 1 && foreign_servers_include_patterns.head != NULL)
793 pg_fatal("option --include-foreign-data is not supported with parallel backup");
794
795 if (data_only && dopt.outputClean)
796 pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
797
798 if (dopt.if_exists && !dopt.outputClean)
799 pg_fatal("option --if-exists requires option -c/--clean");
800
801 /* set derivative flags */
802 dopt.dumpSchema = (!data_only);
803 dopt.dumpData = (!schema_only);
804
805 /*
806 * --inserts are already implied above if --column-inserts or
807 * --rows-per-insert were specified.
808 */
809 if (dopt.do_nothing && dopt.dump_inserts == 0)
810 pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
811
812 /* Identify archive format to emit */
813 archiveFormat = parseArchiveFormat(format, &archiveMode);
814
815 /* archiveFormat specific setup */
816 if (archiveFormat == archNull)
817 plainText = 1;
818
819 /*
820 * Custom and directory formats are compressed by default with gzip when
821 * available, not the others. If gzip is not available, no compression is
822 * done by default.
823 */
824 if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
825 !user_compression_defined)
826 {
827#ifdef HAVE_LIBZ
828 compression_algorithm_str = "gzip";
829#else
830 compression_algorithm_str = "none";
831#endif
832 }
833
834 /*
835 * Compression options
836 */
837 if (!parse_compress_algorithm(compression_algorithm_str,
839 pg_fatal("unrecognized compression algorithm: \"%s\"",
840 compression_algorithm_str);
841
843 &compression_spec);
844 error_detail = validate_compress_specification(&compression_spec);
845 if (error_detail != NULL)
846 pg_fatal("invalid compression specification: %s",
847 error_detail);
848
849 error_detail = supports_compression(compression_spec);
850 if (error_detail != NULL)
851 pg_fatal("%s", error_detail);
852
853 /*
854 * Disable support for zstd workers for now - these are based on
855 * threading, and it's unclear how it interacts with parallel dumps on
856 * platforms where that relies on threads too (e.g. Windows).
857 */
858 if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
859 pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
860 "workers");
861
862 /*
863 * If emitting an archive format, we always want to emit a DATABASE item,
864 * in case --create is specified at pg_restore time.
865 */
866 if (!plainText)
867 dopt.outputCreateDB = 1;
868
869 /* Parallel backup only in the directory archive format so far */
870 if (archiveFormat != archDirectory && numWorkers > 1)
871 pg_fatal("parallel backup only supported by the directory format");
872
873 /* Open the output file */
874 fout = CreateArchive(filename, archiveFormat, compression_spec,
875 dosync, archiveMode, setupDumpWorker, sync_method);
876
877 /* Make dump options accessible right away */
878 SetArchiveOptions(fout, &dopt, NULL);
879
880 /* Register the cleanup hook */
882
883 /* Let the archiver know how noisy to be */
884 fout->verbose = g_verbose;
885
886
887 /*
888 * We allow the server to be back to 9.2, and up to any minor release of
889 * our own major version. (See also version check in pg_dumpall.c.)
890 */
891 fout->minRemoteVersion = 90200;
892 fout->maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;
893
894 fout->numWorkers = numWorkers;
895
896 /*
897 * Open the database using the Archiver, so it knows about it. Errors mean
898 * death.
899 */
900 ConnectDatabase(fout, &dopt.cparams, false);
901 setup_connection(fout, dumpencoding, dumpsnapshot, use_role);
902
903 /*
904 * On hot standbys, never try to dump unlogged table data, since it will
905 * just throw an error.
906 */
907 if (fout->isStandby)
908 dopt.no_unlogged_table_data = true;
909
910 /*
911 * Find the last built-in OID, if needed (prior to 8.1)
912 *
913 * With 8.1 and above, we can just use FirstNormalObjectId - 1.
914 */
916
917 pg_log_info("last built-in OID is %u", g_last_builtin_oid);
918
919 /* Expand schema selection patterns into OID lists */
920 if (schema_include_patterns.head != NULL)
921 {
925 if (schema_include_oids.head == NULL)
926 pg_fatal("no matching schemas were found");
927 }
930 false);
931 /* non-matching exclusion patterns aren't an error */
932
933 /* Expand table selection patterns into OID lists */
936 strict_names, false);
939 strict_names, true);
940 if ((table_include_patterns.head != NULL ||
942 table_include_oids.head == NULL)
943 pg_fatal("no matching tables were found");
944
947 false, false);
950 false, true);
951
954 false, false);
957 false, true);
958
961
962 /* non-matching exclusion patterns aren't an error */
963
964 /* Expand extension selection patterns into OID lists */
966 {
970 if (extension_include_oids.head == NULL)
971 pg_fatal("no matching extensions were found");
972 }
975 false);
976 /* non-matching exclusion patterns aren't an error */
977
978 /*
979 * Dumping LOs is the default for dumps where an inclusion switch is not
980 * used (an "include everything" dump). -B can be used to exclude LOs
981 * from those dumps. -b can be used to include LOs even when an inclusion
982 * switch is used.
983 *
984 * -s means "schema only" and LOs are data, not schema, so we never
985 * include LOs when -s is used.
986 */
987 if (dopt.include_everything && dopt.dumpData && !dopt.dontOutputLOs)
988 dopt.outputLOs = true;
989
990 /*
991 * Collect role names so we can map object owner OIDs to names.
992 */
993 collectRoleNames(fout);
994
995 /*
996 * Now scan the database and create DumpableObject structs for all the
997 * objects we intend to dump.
998 */
999 tblinfo = getSchemaData(fout, &numTables);
1000
1001 if (dopt.dumpData)
1002 {
1003 getTableData(&dopt, tblinfo, numTables, 0);
1005 if (!dopt.dumpSchema)
1007 }
1008
1009 if (!dopt.dumpData && dopt.sequence_data)
1010 getTableData(&dopt, tblinfo, numTables, RELKIND_SEQUENCE);
1011
1012 /*
1013 * In binary-upgrade mode, we do not have to worry about the actual LO
1014 * data or the associated metadata that resides in the pg_largeobject and
1015 * pg_largeobject_metadata tables, respectively.
1016 *
1017 * However, we do need to collect LO information as there may be comments
1018 * or other information on LOs that we do need to dump out.
1019 */
1020 if (dopt.outputLOs || dopt.binary_upgrade)
1021 getLOs(fout);
1022
1023 /*
1024 * Collect dependency data to assist in ordering the objects.
1025 */
1026 getDependencies(fout);
1027
1028 /*
1029 * Collect ACLs, comments, and security labels, if wanted.
1030 */
1031 if (!dopt.aclsSkip)
1032 getAdditionalACLs(fout);
1033 if (!dopt.no_comments)
1034 collectComments(fout);
1035 if (!dopt.no_security_labels)
1036 collectSecLabels(fout);
1037
1038 /* For binary upgrade mode, collect required pg_class information. */
1039 if (dopt.binary_upgrade)
1041
1042 /* Collect sequence information. */
1043 collectSequences(fout);
1044
1045 /* Lastly, create dummy objects to represent the section boundaries */
1046 boundaryObjs = createBoundaryObjects();
1047
1048 /* Get pointers to all the known DumpableObjects */
1049 getDumpableObjects(&dobjs, &numObjs);
1050
1051 /*
1052 * Add dummy dependencies to enforce the dump section ordering.
1053 */
1054 addBoundaryDependencies(dobjs, numObjs, boundaryObjs);
1055
1056 /*
1057 * Sort the objects into a safe dump order (no forward references).
1058 *
1059 * We rely on dependency information to help us determine a safe order, so
1060 * the initial sort is mostly for cosmetic purposes: we sort by name to
1061 * ensure that logically identical schemas will dump identically.
1062 */
1063 sortDumpableObjectsByTypeName(dobjs, numObjs);
1064
1065 sortDumpableObjects(dobjs, numObjs,
1066 boundaryObjs[0].dumpId, boundaryObjs[1].dumpId);
1067
1068 /*
1069 * Create archive TOC entries for all the objects to be dumped, in a safe
1070 * order.
1071 */
1072
1073 /*
1074 * First the special entries for ENCODING, STDSTRINGS, and SEARCHPATH.
1075 */
1076 dumpEncoding(fout);
1077 dumpStdStrings(fout);
1078 dumpSearchPath(fout);
1079
1080 /* The database items are always next, unless we don't want them at all */
1081 if (dopt.outputCreateDB)
1082 dumpDatabase(fout);
1083
1084 /* Now the rearrangeable objects. */
1085 for (i = 0; i < numObjs; i++)
1086 dumpDumpableObject(fout, dobjs[i]);
1087
1088 /*
1089 * Set up options info to ensure we dump what we want.
1090 */
1091 ropt = NewRestoreOptions();
1092 ropt->filename = filename;
1093
1094 /* if you change this list, see dumpOptionsFromRestoreOptions */
1095 ropt->cparams.dbname = dopt.cparams.dbname ? pg_strdup(dopt.cparams.dbname) : NULL;
1096 ropt->cparams.pgport = dopt.cparams.pgport ? pg_strdup(dopt.cparams.pgport) : NULL;
1097 ropt->cparams.pghost = dopt.cparams.pghost ? pg_strdup(dopt.cparams.pghost) : NULL;
1098 ropt->cparams.username = dopt.cparams.username ? pg_strdup(dopt.cparams.username) : NULL;
1100 ropt->dropSchema = dopt.outputClean;
1101 ropt->dumpData = dopt.dumpData;
1102 ropt->dumpSchema = dopt.dumpSchema;
1103 ropt->if_exists = dopt.if_exists;
1104 ropt->column_inserts = dopt.column_inserts;
1105 ropt->dumpSections = dopt.dumpSections;
1106 ropt->aclsSkip = dopt.aclsSkip;
1107 ropt->superuser = dopt.outputSuperuser;
1108 ropt->createDB = dopt.outputCreateDB;
1109 ropt->noOwner = dopt.outputNoOwner;
1110 ropt->noTableAm = dopt.outputNoTableAm;
1111 ropt->noTablespace = dopt.outputNoTablespaces;
1113 ropt->use_setsessauth = dopt.use_setsessauth;
1115 ropt->dump_inserts = dopt.dump_inserts;
1116 ropt->no_comments = dopt.no_comments;
1117 ropt->no_publications = dopt.no_publications;
1120 ropt->lockWaitTimeout = dopt.lockWaitTimeout;
1123 ropt->sequence_data = dopt.sequence_data;
1124 ropt->binary_upgrade = dopt.binary_upgrade;
1125
1126 ropt->compression_spec = compression_spec;
1127
1128 ropt->suppressDumpWarnings = true; /* We've already shown them */
1129
1130 SetArchiveOptions(fout, &dopt, ropt);
1131
1132 /* Mark which entries should be output */
1134
1135 /*
1136 * The archive's TOC entries are now marked as to which ones will actually
1137 * be output, so we can set up their dependency lists properly. This isn't
1138 * necessary for plain-text output, though.
1139 */
1140 if (!plainText)
1142
1143 /*
1144 * And finally we can do the actual output.
1145 *
1146 * Note: for non-plain-text output formats, the output file is written
1147 * inside CloseArchive(). This is, um, bizarre; but not worth changing
1148 * right now.
1149 */
1150 if (plainText)
1151 RestoreArchive(fout);
1152
1153 CloseArchive(fout);
1154
1155 exit_nicely(0);
1156}
1157
1158
1159static void
1160help(const char *progname)
1161{
1162 printf(_("%s dumps a database as a text file or to other formats.\n\n"), progname);
1163 printf(_("Usage:\n"));
1164 printf(_(" %s [OPTION]... [DBNAME]\n"), progname);
1165
1166 printf(_("\nGeneral options:\n"));
1167 printf(_(" -f, --file=FILENAME output file or directory name\n"));
1168 printf(_(" -F, --format=c|d|t|p output file format (custom, directory, tar,\n"
1169 " plain text (default))\n"));
1170 printf(_(" -j, --jobs=NUM use this many parallel jobs to dump\n"));
1171 printf(_(" -v, --verbose verbose mode\n"));
1172 printf(_(" -V, --version output version information, then exit\n"));
1173 printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
1174 " compress as specified\n"));
1175 printf(_(" --lock-wait-timeout=TIMEOUT fail after waiting TIMEOUT for a table lock\n"));
1176 printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
1177 printf(_(" --sync-method=METHOD set method for syncing files to disk\n"));
1178 printf(_(" -?, --help show this help, then exit\n"));
1179
1180 printf(_("\nOptions controlling the output content:\n"));
1181 printf(_(" -a, --data-only dump only the data, not the schema\n"));
1182 printf(_(" -b, --large-objects include large objects in dump\n"));
1183 printf(_(" --blobs (same as --large-objects, deprecated)\n"));
1184 printf(_(" -B, --no-large-objects exclude large objects in dump\n"));
1185 printf(_(" --no-blobs (same as --no-large-objects, deprecated)\n"));
1186 printf(_(" -c, --clean clean (drop) database objects before recreating\n"));
1187 printf(_(" -C, --create include commands to create database in dump\n"));
1188 printf(_(" -e, --extension=PATTERN dump the specified extension(s) only\n"));
1189 printf(_(" -E, --encoding=ENCODING dump the data in encoding ENCODING\n"));
1190 printf(_(" -n, --schema=PATTERN dump the specified schema(s) only\n"));
1191 printf(_(" -N, --exclude-schema=PATTERN do NOT dump the specified schema(s)\n"));
1192 printf(_(" -O, --no-owner skip restoration of object ownership in\n"
1193 " plain-text format\n"));
1194 printf(_(" -s, --schema-only dump only the schema, no data\n"));
1195 printf(_(" -S, --superuser=NAME superuser user name to use in plain-text format\n"));
1196 printf(_(" -t, --table=PATTERN dump only the specified table(s)\n"));
1197 printf(_(" -T, --exclude-table=PATTERN do NOT dump the specified table(s)\n"));
1198 printf(_(" -x, --no-privileges do not dump privileges (grant/revoke)\n"));
1199 printf(_(" --binary-upgrade for use by upgrade utilities only\n"));
1200 printf(_(" --column-inserts dump data as INSERT commands with column names\n"));
1201 printf(_(" --disable-dollar-quoting disable dollar quoting, use SQL standard quoting\n"));
1202 printf(_(" --disable-triggers disable triggers during data-only restore\n"));
1203 printf(_(" --enable-row-security enable row security (dump only content user has\n"
1204 " access to)\n"));
1205 printf(_(" --exclude-extension=PATTERN do NOT dump the specified extension(s)\n"));
1206 printf(_(" --exclude-table-and-children=PATTERN\n"
1207 " do NOT dump the specified table(s), including\n"
1208 " child and partition tables\n"));
1209 printf(_(" --exclude-table-data=PATTERN do NOT dump data for the specified table(s)\n"));
1210 printf(_(" --exclude-table-data-and-children=PATTERN\n"
1211 " do NOT dump data for the specified table(s),\n"
1212 " including child and partition tables\n"));
1213 printf(_(" --extra-float-digits=NUM override default setting for extra_float_digits\n"));
1214 printf(_(" --filter=FILENAME include or exclude objects and data from dump\n"
1215 " based on expressions in FILENAME\n"));
1216 printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
1217 printf(_(" --include-foreign-data=PATTERN\n"
1218 " include data of foreign tables on foreign\n"
1219 " servers matching PATTERN\n"));
1220 printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
1221 printf(_(" --load-via-partition-root load partitions via the root table\n"));
1222 printf(_(" --no-comments do not dump comment commands\n"));
1223 printf(_(" --no-publications do not dump publications\n"));
1224 printf(_(" --no-security-labels do not dump security label assignments\n"));
1225 printf(_(" --no-subscriptions do not dump subscriptions\n"));
1226 printf(_(" --no-table-access-method do not dump table access methods\n"));
1227 printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
1228 printf(_(" --no-toast-compression do not dump TOAST compression methods\n"));
1229 printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
1230 printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
1231 printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
1232 printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
1233 printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
1234 printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
1235 printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
1236 printf(_(" --strict-names require table and/or schema include patterns to\n"
1237 " match at least one entity each\n"));
1238 printf(_(" --table-and-children=PATTERN dump only the specified table(s), including\n"
1239 " child and partition tables\n"));
1240 printf(_(" --use-set-session-authorization\n"
1241 " use SET SESSION AUTHORIZATION commands instead of\n"
1242 " ALTER OWNER commands to set ownership\n"));
1243
1244 printf(_("\nConnection options:\n"));
1245 printf(_(" -d, --dbname=DBNAME database to dump\n"));
1246 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
1247 printf(_(" -p, --port=PORT database server port number\n"));
1248 printf(_(" -U, --username=NAME connect as specified database user\n"));
1249 printf(_(" -w, --no-password never prompt for password\n"));
1250 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1251 printf(_(" --role=ROLENAME do SET ROLE before dump\n"));
1252
1253 printf(_("\nIf no database name is supplied, then the PGDATABASE environment\n"
1254 "variable value is used.\n\n"));
1255 printf(_("Report bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1256 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
1257}
1258
1259static void
1260setup_connection(Archive *AH, const char *dumpencoding,
1261 const char *dumpsnapshot, char *use_role)
1262{
1263 DumpOptions *dopt = AH->dopt;
1264 PGconn *conn = GetConnection(AH);
1265 const char *std_strings;
1266
1268
1269 /*
1270 * Set the client encoding if requested.
1271 */
1272 if (dumpencoding)
1273 {
1274 if (PQsetClientEncoding(conn, dumpencoding) < 0)
1275 pg_fatal("invalid client encoding \"%s\" specified",
1276 dumpencoding);
1277 }
1278
1279 /*
1280 * Get the active encoding and the standard_conforming_strings setting, so
1281 * we know how to escape strings.
1282 */
1285
1286 std_strings = PQparameterStatus(conn, "standard_conforming_strings");
1287 AH->std_strings = (std_strings && strcmp(std_strings, "on") == 0);
1288
1289 /*
1290 * Set the role if requested. In a parallel dump worker, we'll be passed
1291 * use_role == NULL, but AH->use_role is already set (if user specified it
1292 * originally) and we should use that.
1293 */
1294 if (!use_role && AH->use_role)
1295 use_role = AH->use_role;
1296
1297 /* Set the role if requested */
1298 if (use_role)
1299 {
1301
1302 appendPQExpBuffer(query, "SET ROLE %s", fmtId(use_role));
1303 ExecuteSqlStatement(AH, query->data);
1304 destroyPQExpBuffer(query);
1305
1306 /* save it for possible later use by parallel workers */
1307 if (!AH->use_role)
1308 AH->use_role = pg_strdup(use_role);
1309 }
1310
1311 /* Set the datestyle to ISO to ensure the dump's portability */
1312 ExecuteSqlStatement(AH, "SET DATESTYLE = ISO");
1313
1314 /* Likewise, avoid using sql_standard intervalstyle */
1315 ExecuteSqlStatement(AH, "SET INTERVALSTYLE = POSTGRES");
1316
1317 /*
1318 * Use an explicitly specified extra_float_digits if it has been provided.
1319 * Otherwise, set extra_float_digits so that we can dump float data
1320 * exactly (given correctly implemented float I/O code, anyway).
1321 */
1323 {
1325
1326 appendPQExpBuffer(q, "SET extra_float_digits TO %d",
1328 ExecuteSqlStatement(AH, q->data);
1330 }
1331 else
1332 ExecuteSqlStatement(AH, "SET extra_float_digits TO 3");
1333
1334 /*
1335 * Disable synchronized scanning, to prevent unpredictable changes in row
1336 * ordering across a dump and reload.
1337 */
1338 ExecuteSqlStatement(AH, "SET synchronize_seqscans TO off");
1339
1340 /*
1341 * Disable timeouts if supported.
1342 */
1343 ExecuteSqlStatement(AH, "SET statement_timeout = 0");
1344 if (AH->remoteVersion >= 90300)
1345 ExecuteSqlStatement(AH, "SET lock_timeout = 0");
1346 if (AH->remoteVersion >= 90600)
1347 ExecuteSqlStatement(AH, "SET idle_in_transaction_session_timeout = 0");
1348 if (AH->remoteVersion >= 170000)
1349 ExecuteSqlStatement(AH, "SET transaction_timeout = 0");
1350
1351 /*
1352 * Quote all identifiers, if requested.
1353 */
1355 ExecuteSqlStatement(AH, "SET quote_all_identifiers = true");
1356
1357 /*
1358 * Adjust row-security mode, if supported.
1359 */
1360 if (AH->remoteVersion >= 90500)
1361 {
1362 if (dopt->enable_row_security)
1363 ExecuteSqlStatement(AH, "SET row_security = on");
1364 else
1365 ExecuteSqlStatement(AH, "SET row_security = off");
1366 }
1367
1368 /*
1369 * For security reasons, we restrict the expansion of non-system views and
1370 * access to foreign tables during the pg_dump process. This restriction
1371 * is adjusted when dumping foreign table data.
1372 */
1373 set_restrict_relation_kind(AH, "view, foreign-table");
1374
1375 /*
1376 * Initialize prepared-query state to "nothing prepared". We do this here
1377 * so that a parallel dump worker will have its own state.
1378 */
1379 AH->is_prepared = (bool *) pg_malloc0(NUM_PREP_QUERIES * sizeof(bool));
1380
1381 /*
1382 * Start transaction-snapshot mode transaction to dump consistent data.
1383 */
1384 ExecuteSqlStatement(AH, "BEGIN");
1385
1386 /*
1387 * To support the combination of serializable_deferrable with the jobs
1388 * option we use REPEATABLE READ for the worker connections that are
1389 * passed a snapshot. As long as the snapshot is acquired in a
1390 * SERIALIZABLE, READ ONLY, DEFERRABLE transaction, its use within a
1391 * REPEATABLE READ transaction provides the appropriate integrity
1392 * guarantees. This is a kluge, but safe for back-patching.
1393 */
1394 if (dopt->serializable_deferrable && AH->sync_snapshot_id == NULL)
1396 "SET TRANSACTION ISOLATION LEVEL "
1397 "SERIALIZABLE, READ ONLY, DEFERRABLE");
1398 else
1400 "SET TRANSACTION ISOLATION LEVEL "
1401 "REPEATABLE READ, READ ONLY");
1402
1403 /*
1404 * If user specified a snapshot to use, select that. In a parallel dump
1405 * worker, we'll be passed dumpsnapshot == NULL, but AH->sync_snapshot_id
1406 * is already set (if the server can handle it) and we should use that.
1407 */
1408 if (dumpsnapshot)
1409 AH->sync_snapshot_id = pg_strdup(dumpsnapshot);
1410
1411 if (AH->sync_snapshot_id)
1412 {
1414
1415 appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
1417 ExecuteSqlStatement(AH, query->data);
1418 destroyPQExpBuffer(query);
1419 }
1420 else if (AH->numWorkers > 1)
1421 {
1422 if (AH->isStandby && AH->remoteVersion < 100000)
1423 pg_fatal("parallel dumps from standby servers are not supported by this server version");
1425 }
1426}
1427
1428/* Set up connection for a parallel worker process */
1429static void
1431{
1432 /*
1433 * We want to re-select all the same values the leader connection is
1434 * using. We'll have inherited directly-usable values in
1435 * AH->sync_snapshot_id and AH->use_role, but we need to translate the
1436 * inherited encoding value back to a string to pass to setup_connection.
1437 */
1440 NULL,
1441 NULL);
1442}
1443
1444static char *
1446{
1447 char *query = "SELECT pg_catalog.pg_export_snapshot()";
1448 char *result;
1449 PGresult *res;
1450
1451 res = ExecuteSqlQueryForSingleRow(fout, query);
1452 result = pg_strdup(PQgetvalue(res, 0, 0));
1453 PQclear(res);
1454
1455 return result;
1456}
1457
1458static ArchiveFormat
1460{
1461 ArchiveFormat archiveFormat;
1462
1464
1465 if (pg_strcasecmp(format, "a") == 0 || pg_strcasecmp(format, "append") == 0)
1466 {
1467 /* This is used by pg_dumpall, and is not documented */
1468 archiveFormat = archNull;
1470 }
1471 else if (pg_strcasecmp(format, "c") == 0)
1472 archiveFormat = archCustom;
1473 else if (pg_strcasecmp(format, "custom") == 0)
1474 archiveFormat = archCustom;
1475 else if (pg_strcasecmp(format, "d") == 0)
1476 archiveFormat = archDirectory;
1477 else if (pg_strcasecmp(format, "directory") == 0)
1478 archiveFormat = archDirectory;
1479 else if (pg_strcasecmp(format, "p") == 0)
1480 archiveFormat = archNull;
1481 else if (pg_strcasecmp(format, "plain") == 0)
1482 archiveFormat = archNull;
1483 else if (pg_strcasecmp(format, "t") == 0)
1484 archiveFormat = archTar;
1485 else if (pg_strcasecmp(format, "tar") == 0)
1486 archiveFormat = archTar;
1487 else
1488 pg_fatal("invalid output format \"%s\" specified", format);
1489 return archiveFormat;
1490}
1491
1492/*
1493 * Find the OIDs of all schemas matching the given list of patterns,
1494 * and append them to the given OID list.
1495 */
1496static void
1498 SimpleStringList *patterns,
1499 SimpleOidList *oids,
1500 bool strict_names)
1501{
1502 PQExpBuffer query;
1503 PGresult *res;
1505 int i;
1506
1507 if (patterns->head == NULL)
1508 return; /* nothing to do */
1509
1510 query = createPQExpBuffer();
1511
1512 /*
1513 * The loop below runs multiple SELECTs might sometimes result in
1514 * duplicate entries in the OID list, but we don't care.
1515 */
1516
1517 for (cell = patterns->head; cell; cell = cell->next)
1518 {
1519 PQExpBufferData dbbuf;
1520 int dotcnt;
1521
1523 "SELECT oid FROM pg_catalog.pg_namespace n\n");
1524 initPQExpBuffer(&dbbuf);
1525 processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1526 false, NULL, "n.nspname", NULL, NULL, &dbbuf,
1527 &dotcnt);
1528 if (dotcnt > 1)
1529 pg_fatal("improper qualified name (too many dotted names): %s",
1530 cell->val);
1531 else if (dotcnt == 1)
1532 prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1533 termPQExpBuffer(&dbbuf);
1534
1535 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1536 if (strict_names && PQntuples(res) == 0)
1537 pg_fatal("no matching schemas were found for pattern \"%s\"", cell->val);
1538
1539 for (i = 0; i < PQntuples(res); i++)
1540 {
1542 }
1543
1544 PQclear(res);
1545 resetPQExpBuffer(query);
1546 }
1547
1548 destroyPQExpBuffer(query);
1549}
1550
1551/*
1552 * Find the OIDs of all extensions matching the given list of patterns,
1553 * and append them to the given OID list.
1554 */
1555static void
1557 SimpleStringList *patterns,
1558 SimpleOidList *oids,
1559 bool strict_names)
1560{
1561 PQExpBuffer query;
1562 PGresult *res;
1564 int i;
1565
1566 if (patterns->head == NULL)
1567 return; /* nothing to do */
1568
1569 query = createPQExpBuffer();
1570
1571 /*
1572 * The loop below runs multiple SELECTs might sometimes result in
1573 * duplicate entries in the OID list, but we don't care.
1574 */
1575 for (cell = patterns->head; cell; cell = cell->next)
1576 {
1577 int dotcnt;
1578
1580 "SELECT oid FROM pg_catalog.pg_extension e\n");
1581 processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1582 false, NULL, "e.extname", NULL, NULL, NULL,
1583 &dotcnt);
1584 if (dotcnt > 0)
1585 pg_fatal("improper qualified name (too many dotted names): %s",
1586 cell->val);
1587
1588 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1589 if (strict_names && PQntuples(res) == 0)
1590 pg_fatal("no matching extensions were found for pattern \"%s\"", cell->val);
1591
1592 for (i = 0; i < PQntuples(res); i++)
1593 {
1595 }
1596
1597 PQclear(res);
1598 resetPQExpBuffer(query);
1599 }
1600
1601 destroyPQExpBuffer(query);
1602}
1603
1604/*
1605 * Find the OIDs of all foreign servers matching the given list of patterns,
1606 * and append them to the given OID list.
1607 */
1608static void
1610 SimpleStringList *patterns,
1611 SimpleOidList *oids)
1612{
1613 PQExpBuffer query;
1614 PGresult *res;
1616 int i;
1617
1618 if (patterns->head == NULL)
1619 return; /* nothing to do */
1620
1621 query = createPQExpBuffer();
1622
1623 /*
1624 * The loop below runs multiple SELECTs might sometimes result in
1625 * duplicate entries in the OID list, but we don't care.
1626 */
1627
1628 for (cell = patterns->head; cell; cell = cell->next)
1629 {
1630 int dotcnt;
1631
1633 "SELECT oid FROM pg_catalog.pg_foreign_server s\n");
1634 processSQLNamePattern(GetConnection(fout), query, cell->val, false,
1635 false, NULL, "s.srvname", NULL, NULL, NULL,
1636 &dotcnt);
1637 if (dotcnt > 0)
1638 pg_fatal("improper qualified name (too many dotted names): %s",
1639 cell->val);
1640
1641 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1642 if (PQntuples(res) == 0)
1643 pg_fatal("no matching foreign servers were found for pattern \"%s\"", cell->val);
1644
1645 for (i = 0; i < PQntuples(res); i++)
1647
1648 PQclear(res);
1649 resetPQExpBuffer(query);
1650 }
1651
1652 destroyPQExpBuffer(query);
1653}
1654
1655/*
1656 * Find the OIDs of all tables matching the given list of patterns,
1657 * and append them to the given OID list. See also expand_dbname_patterns()
1658 * in pg_dumpall.c
1659 */
1660static void
1662 SimpleStringList *patterns, SimpleOidList *oids,
1663 bool strict_names, bool with_child_tables)
1664{
1665 PQExpBuffer query;
1666 PGresult *res;
1668 int i;
1669
1670 if (patterns->head == NULL)
1671 return; /* nothing to do */
1672
1673 query = createPQExpBuffer();
1674
1675 /*
1676 * this might sometimes result in duplicate entries in the OID list, but
1677 * we don't care.
1678 */
1679
1680 for (cell = patterns->head; cell; cell = cell->next)
1681 {
1682 PQExpBufferData dbbuf;
1683 int dotcnt;
1684
1685 /*
1686 * Query must remain ABSOLUTELY devoid of unqualified names. This
1687 * would be unnecessary given a pg_table_is_visible() variant taking a
1688 * search_path argument.
1689 *
1690 * For with_child_tables, we start with the basic query's results and
1691 * recursively search the inheritance tree to add child tables.
1692 */
1693 if (with_child_tables)
1694 {
1695 appendPQExpBuffer(query, "WITH RECURSIVE partition_tree (relid) AS (\n");
1696 }
1697
1698 appendPQExpBuffer(query,
1699 "SELECT c.oid"
1700 "\nFROM pg_catalog.pg_class c"
1701 "\n LEFT JOIN pg_catalog.pg_namespace n"
1702 "\n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace"
1703 "\nWHERE c.relkind OPERATOR(pg_catalog.=) ANY"
1704 "\n (array['%c', '%c', '%c', '%c', '%c', '%c'])\n",
1705 RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW,
1706 RELKIND_MATVIEW, RELKIND_FOREIGN_TABLE,
1707 RELKIND_PARTITIONED_TABLE);
1708 initPQExpBuffer(&dbbuf);
1709 processSQLNamePattern(GetConnection(fout), query, cell->val, true,
1710 false, "n.nspname", "c.relname", NULL,
1711 "pg_catalog.pg_table_is_visible(c.oid)", &dbbuf,
1712 &dotcnt);
1713 if (dotcnt > 2)
1714 pg_fatal("improper relation name (too many dotted names): %s",
1715 cell->val);
1716 else if (dotcnt == 2)
1717 prohibit_crossdb_refs(GetConnection(fout), dbbuf.data, cell->val);
1718 termPQExpBuffer(&dbbuf);
1719
1720 if (with_child_tables)
1721 {
1722 appendPQExpBuffer(query, "UNION"
1723 "\nSELECT i.inhrelid"
1724 "\nFROM partition_tree p"
1725 "\n JOIN pg_catalog.pg_inherits i"
1726 "\n ON p.relid OPERATOR(pg_catalog.=) i.inhparent"
1727 "\n)"
1728 "\nSELECT relid FROM partition_tree");
1729 }
1730
1731 ExecuteSqlStatement(fout, "RESET search_path");
1732 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
1735 if (strict_names && PQntuples(res) == 0)
1736 pg_fatal("no matching tables were found for pattern \"%s\"", cell->val);
1737
1738 for (i = 0; i < PQntuples(res); i++)
1739 {
1741 }
1742
1743 PQclear(res);
1744 resetPQExpBuffer(query);
1745 }
1746
1747 destroyPQExpBuffer(query);
1748}
1749
1750/*
1751 * Verifies that the connected database name matches the given database name,
1752 * and if not, dies with an error about the given pattern.
1753 *
1754 * The 'dbname' argument should be a literal name parsed from 'pattern'.
1755 */
1756static void
1757prohibit_crossdb_refs(PGconn *conn, const char *dbname, const char *pattern)
1758{
1759 const char *db;
1760
1761 db = PQdb(conn);
1762 if (db == NULL)
1763 pg_fatal("You are currently not connected to a database.");
1764
1765 if (strcmp(db, dbname) != 0)
1766 pg_fatal("cross-database references are not implemented: %s",
1767 pattern);
1768}
1769
1770/*
1771 * checkExtensionMembership
1772 * Determine whether object is an extension member, and if so,
1773 * record an appropriate dependency and set the object's dump flag.
1774 *
1775 * It's important to call this for each object that could be an extension
1776 * member. Generally, we integrate this with determining the object's
1777 * to-be-dumped-ness, since extension membership overrides other rules for that.
1778 *
1779 * Returns true if object is an extension member, else false.
1780 */
1781static bool
1783{
1785
1786 if (ext == NULL)
1787 return false;
1788
1789 dobj->ext_member = true;
1790
1791 /* Record dependency so that getDependencies needn't deal with that */
1792 addObjectDependency(dobj, ext->dobj.dumpId);
1793
1794 /*
1795 * In 9.6 and above, mark the member object to have any non-initial ACLs
1796 * dumped. (Any initial ACLs will be removed later, using data from
1797 * pg_init_privs, so that we'll dump only the delta from the extension's
1798 * initial setup.)
1799 *
1800 * Prior to 9.6, we do not include any extension member components.
1801 *
1802 * In binary upgrades, we still dump all components of the members
1803 * individually, since the idea is to exactly reproduce the database
1804 * contents rather than replace the extension contents with something
1805 * different.
1806 *
1807 * Note: it might be interesting someday to implement storage and delta
1808 * dumping of extension members' RLS policies and/or security labels.
1809 * However there is a pitfall for RLS policies: trying to dump them
1810 * requires getting a lock on their tables, and the calling user might not
1811 * have privileges for that. We need no lock to examine a table's ACLs,
1812 * so the current feature doesn't have a problem of that sort.
1813 */
1814 if (fout->dopt->binary_upgrade)
1815 dobj->dump = ext->dobj.dump;
1816 else
1817 {
1818 if (fout->remoteVersion < 90600)
1819 dobj->dump = DUMP_COMPONENT_NONE;
1820 else
1821 dobj->dump = ext->dobj.dump_contains & (DUMP_COMPONENT_ACL);
1822 }
1823
1824 return true;
1825}
1826
1827/*
1828 * selectDumpableNamespace: policy-setting subroutine
1829 * Mark a namespace as to be dumped or not
1830 */
1831static void
1833{
1834 /*
1835 * DUMP_COMPONENT_DEFINITION typically implies a CREATE SCHEMA statement
1836 * and (for --clean) a DROP SCHEMA statement. (In the absence of
1837 * DUMP_COMPONENT_DEFINITION, this value is irrelevant.)
1838 */
1839 nsinfo->create = true;
1840
1841 /*
1842 * If specific tables are being dumped, do not dump any complete
1843 * namespaces. If specific namespaces are being dumped, dump just those
1844 * namespaces. Otherwise, dump all non-system namespaces.
1845 */
1846 if (table_include_oids.head != NULL)
1847 nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1848 else if (schema_include_oids.head != NULL)
1849 nsinfo->dobj.dump_contains = nsinfo->dobj.dump =
1851 nsinfo->dobj.catId.oid) ?
1853 else if (fout->remoteVersion >= 90600 &&
1854 strcmp(nsinfo->dobj.name, "pg_catalog") == 0)
1855 {
1856 /*
1857 * In 9.6 and above, we dump out any ACLs defined in pg_catalog, if
1858 * they are interesting (and not the original ACLs which were set at
1859 * initdb time, see pg_init_privs).
1860 */
1861 nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ACL;
1862 }
1863 else if (strncmp(nsinfo->dobj.name, "pg_", 3) == 0 ||
1864 strcmp(nsinfo->dobj.name, "information_schema") == 0)
1865 {
1866 /* Other system schemas don't get dumped */
1867 nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1868 }
1869 else if (strcmp(nsinfo->dobj.name, "public") == 0)
1870 {
1871 /*
1872 * The public schema is a strange beast that sits in a sort of
1873 * no-mans-land between being a system object and a user object.
1874 * CREATE SCHEMA would fail, so its DUMP_COMPONENT_DEFINITION is just
1875 * a comment and an indication of ownership. If the owner is the
1876 * default, omit that superfluous DUMP_COMPONENT_DEFINITION. Before
1877 * v15, the default owner was BOOTSTRAP_SUPERUSERID.
1878 */
1879 nsinfo->create = false;
1880 nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1881 if (nsinfo->nspowner == ROLE_PG_DATABASE_OWNER)
1882 nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION;
1884
1885 /*
1886 * Also, make like it has a comment even if it doesn't; this is so
1887 * that we'll emit a command to drop the comment, if appropriate.
1888 * (Without this, we'd not call dumpCommentExtended for it.)
1889 */
1891 }
1892 else
1893 nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL;
1894
1895 /*
1896 * In any case, a namespace can be excluded by an exclusion switch
1897 */
1898 if (nsinfo->dobj.dump_contains &&
1900 nsinfo->dobj.catId.oid))
1901 nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_NONE;
1902
1903 /*
1904 * If the schema belongs to an extension, allow extension membership to
1905 * override the dump decision for the schema itself. However, this does
1906 * not change dump_contains, so this won't change what we do with objects
1907 * within the schema. (If they belong to the extension, they'll get
1908 * suppressed by it, otherwise not.)
1909 */
1910 (void) checkExtensionMembership(&nsinfo->dobj, fout);
1911}
1912
1913/*
1914 * selectDumpableTable: policy-setting subroutine
1915 * Mark a table as to be dumped or not
1916 */
1917static void
1919{
1920 if (checkExtensionMembership(&tbinfo->dobj, fout))
1921 return; /* extension membership overrides all else */
1922
1923 /*
1924 * If specific tables are being dumped, dump just those tables; else, dump
1925 * according to the parent namespace's dump flag.
1926 */
1927 if (table_include_oids.head != NULL)
1929 tbinfo->dobj.catId.oid) ?
1931 else
1932 tbinfo->dobj.dump = tbinfo->dobj.namespace->dobj.dump_contains;
1933
1934 /*
1935 * In any case, a table can be excluded by an exclusion switch
1936 */
1937 if (tbinfo->dobj.dump &&
1939 tbinfo->dobj.catId.oid))
1940 tbinfo->dobj.dump = DUMP_COMPONENT_NONE;
1941}
1942
1943/*
1944 * selectDumpableType: policy-setting subroutine
1945 * Mark a type as to be dumped or not
1946 *
1947 * If it's a table's rowtype or an autogenerated array type, we also apply a
1948 * special type code to facilitate sorting into the desired order. (We don't
1949 * want to consider those to be ordinary types because that would bring tables
1950 * up into the datatype part of the dump order.) We still set the object's
1951 * dump flag; that's not going to cause the dummy type to be dumped, but we
1952 * need it so that casts involving such types will be dumped correctly -- see
1953 * dumpCast. This means the flag should be set the same as for the underlying
1954 * object (the table or base type).
1955 */
1956static void
1958{
1959 /* skip complex types, except for standalone composite types */
1960 if (OidIsValid(tyinfo->typrelid) &&
1961 tyinfo->typrelkind != RELKIND_COMPOSITE_TYPE)
1962 {
1963 TableInfo *tytable = findTableByOid(tyinfo->typrelid);
1964
1965 tyinfo->dobj.objType = DO_DUMMY_TYPE;
1966 if (tytable != NULL)
1967 tyinfo->dobj.dump = tytable->dobj.dump;
1968 else
1969 tyinfo->dobj.dump = DUMP_COMPONENT_NONE;
1970 return;
1971 }
1972
1973 /* skip auto-generated array and multirange types */
1974 if (tyinfo->isArray || tyinfo->isMultirange)
1975 {
1976 tyinfo->dobj.objType = DO_DUMMY_TYPE;
1977
1978 /*
1979 * Fall through to set the dump flag; we assume that the subsequent
1980 * rules will do the same thing as they would for the array's base
1981 * type or multirange's range type. (We cannot reliably look up the
1982 * base type here, since getTypes may not have processed it yet.)
1983 */
1984 }
1985
1986 if (checkExtensionMembership(&tyinfo->dobj, fout))
1987 return; /* extension membership overrides all else */
1988
1989 /* Dump based on if the contents of the namespace are being dumped */
1990 tyinfo->dobj.dump = tyinfo->dobj.namespace->dobj.dump_contains;
1991}
1992
1993/*
1994 * selectDumpableDefaultACL: policy-setting subroutine
1995 * Mark a default ACL as to be dumped or not
1996 *
1997 * For per-schema default ACLs, dump if the schema is to be dumped.
1998 * Otherwise dump if we are dumping "everything". Note that dumpSchema
1999 * and aclsSkip are checked separately.
2000 */
2001static void
2003{
2004 /* Default ACLs can't be extension members */
2005
2006 if (dinfo->dobj.namespace)
2007 /* default ACLs are considered part of the namespace */
2008 dinfo->dobj.dump = dinfo->dobj.namespace->dobj.dump_contains;
2009 else
2010 dinfo->dobj.dump = dopt->include_everything ?
2012}
2013
2014/*
2015 * selectDumpableCast: policy-setting subroutine
2016 * Mark a cast as to be dumped or not
2017 *
2018 * Casts do not belong to any particular namespace (since they haven't got
2019 * names), nor do they have identifiable owners. To distinguish user-defined
2020 * casts from built-in ones, we must resort to checking whether the cast's
2021 * OID is in the range reserved for initdb.
2022 */
2023static void
2025{
2026 if (checkExtensionMembership(&cast->dobj, fout))
2027 return; /* extension membership overrides all else */
2028
2029 /*
2030 * This would be DUMP_COMPONENT_ACL for from-initdb casts, but they do not
2031 * support ACLs currently.
2032 */
2033 if (cast->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2035 else
2036 cast->dobj.dump = fout->dopt->include_everything ?
2038}
2039
2040/*
2041 * selectDumpableProcLang: policy-setting subroutine
2042 * Mark a procedural language as to be dumped or not
2043 *
2044 * Procedural languages do not belong to any particular namespace. To
2045 * identify built-in languages, we must resort to checking whether the
2046 * language's OID is in the range reserved for initdb.
2047 */
2048static void
2050{
2051 if (checkExtensionMembership(&plang->dobj, fout))
2052 return; /* extension membership overrides all else */
2053
2054 /*
2055 * Only include procedural languages when we are dumping everything.
2056 *
2057 * For from-initdb procedural languages, only include ACLs, as we do for
2058 * the pg_catalog namespace. We need this because procedural languages do
2059 * not live in any namespace.
2060 */
2061 if (!fout->dopt->include_everything)
2062 plang->dobj.dump = DUMP_COMPONENT_NONE;
2063 else
2064 {
2065 if (plang->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2066 plang->dobj.dump = fout->remoteVersion < 90600 ?
2068 else
2069 plang->dobj.dump = DUMP_COMPONENT_ALL;
2070 }
2071}
2072
2073/*
2074 * selectDumpableAccessMethod: policy-setting subroutine
2075 * Mark an access method as to be dumped or not
2076 *
2077 * Access methods do not belong to any particular namespace. To identify
2078 * built-in access methods, we must resort to checking whether the
2079 * method's OID is in the range reserved for initdb.
2080 */
2081static void
2083{
2084 if (checkExtensionMembership(&method->dobj, fout))
2085 return; /* extension membership overrides all else */
2086
2087 /*
2088 * This would be DUMP_COMPONENT_ACL for from-initdb access methods, but
2089 * they do not support ACLs currently.
2090 */
2091 if (method->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2092 method->dobj.dump = DUMP_COMPONENT_NONE;
2093 else
2094 method->dobj.dump = fout->dopt->include_everything ?
2096}
2097
2098/*
2099 * selectDumpableExtension: policy-setting subroutine
2100 * Mark an extension as to be dumped or not
2101 *
2102 * Built-in extensions should be skipped except for checking ACLs, since we
2103 * assume those will already be installed in the target database. We identify
2104 * such extensions by their having OIDs in the range reserved for initdb.
2105 * We dump all user-added extensions by default. No extensions are dumped
2106 * if include_everything is false (i.e., a --schema or --table switch was
2107 * given), except if --extension specifies a list of extensions to dump.
2108 */
2109static void
2111{
2112 /*
2113 * Use DUMP_COMPONENT_ACL for built-in extensions, to allow users to
2114 * change permissions on their member objects, if they wish to, and have
2115 * those changes preserved.
2116 */
2117 if (extinfo->dobj.catId.oid <= (Oid) g_last_builtin_oid)
2118 extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_ACL;
2119 else
2120 {
2121 /* check if there is a list of extensions to dump */
2122 if (extension_include_oids.head != NULL)
2123 extinfo->dobj.dump = extinfo->dobj.dump_contains =
2125 extinfo->dobj.catId.oid) ?
2127 else
2128 extinfo->dobj.dump = extinfo->dobj.dump_contains =
2129 dopt->include_everything ?
2131
2132 /* check that the extension is not explicitly excluded */
2133 if (extinfo->dobj.dump &&
2135 extinfo->dobj.catId.oid))
2136 extinfo->dobj.dump = extinfo->dobj.dump_contains = DUMP_COMPONENT_NONE;
2137 }
2138}
2139
2140/*
2141 * selectDumpablePublicationObject: policy-setting subroutine
2142 * Mark a publication object as to be dumped or not
2143 *
2144 * A publication can have schemas and tables which have schemas, but those are
2145 * ignored in decision making, because publications are only dumped when we are
2146 * dumping everything.
2147 */
2148static void
2150{
2151 if (checkExtensionMembership(dobj, fout))
2152 return; /* extension membership overrides all else */
2153
2154 dobj->dump = fout->dopt->include_everything ?
2156}
2157
2158/*
2159 * selectDumpableStatisticsObject: policy-setting subroutine
2160 * Mark an extended statistics object as to be dumped or not
2161 *
2162 * We dump an extended statistics object if the schema it's in and the table
2163 * it's for are being dumped. (This'll need more thought if statistics
2164 * objects ever support cross-table stats.)
2165 */
2166static void
2168{
2169 if (checkExtensionMembership(&sobj->dobj, fout))
2170 return; /* extension membership overrides all else */
2171
2172 sobj->dobj.dump = sobj->dobj.namespace->dobj.dump_contains;
2173 if (sobj->stattable == NULL ||
2176}
2177
2178/*
2179 * selectDumpableObject: policy-setting subroutine
2180 * Mark a generic dumpable object as to be dumped or not
2181 *
2182 * Use this only for object types without a special-case routine above.
2183 */
2184static void
2186{
2187 if (checkExtensionMembership(dobj, fout))
2188 return; /* extension membership overrides all else */
2189
2190 /*
2191 * Default policy is to dump if parent namespace is dumpable, or for
2192 * non-namespace-associated items, dump if we're dumping "everything".
2193 */
2194 if (dobj->namespace)
2195 dobj->dump = dobj->namespace->dobj.dump_contains;
2196 else
2197 dobj->dump = fout->dopt->include_everything ?
2199}
2200
2201/*
2202 * Dump a table's contents for loading using the COPY command
2203 * - this routine is called by the Archiver when it wants the table
2204 * to be dumped.
2205 */
2206static int
2207dumpTableData_copy(Archive *fout, const void *dcontext)
2208{
2209 TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2210 TableInfo *tbinfo = tdinfo->tdtable;
2211 const char *classname = tbinfo->dobj.name;
2213
2214 /*
2215 * Note: can't use getThreadLocalPQExpBuffer() here, we're calling fmtId
2216 * which uses it already.
2217 */
2218 PQExpBuffer clistBuf = createPQExpBuffer();
2219 PGconn *conn = GetConnection(fout);
2220 PGresult *res;
2221 int ret;
2222 char *copybuf;
2223 const char *column_list;
2224
2225 pg_log_info("dumping contents of table \"%s.%s\"",
2226 tbinfo->dobj.namespace->dobj.name, classname);
2227
2228 /*
2229 * Specify the column list explicitly so that we have no possibility of
2230 * retrieving data in the wrong column order. (The default column
2231 * ordering of COPY will not be what we want in certain corner cases
2232 * involving ADD COLUMN and inheritance.)
2233 */
2234 column_list = fmtCopyColumnList(tbinfo, clistBuf);
2235
2236 /*
2237 * Use COPY (SELECT ...) TO when dumping a foreign table's data, and when
2238 * a filter condition was specified. For other cases a simple COPY
2239 * suffices.
2240 */
2241 if (tdinfo->filtercond || tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2242 {
2243 /* Temporary allows to access to foreign tables to dump data */
2244 if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2245 set_restrict_relation_kind(fout, "view");
2246
2247 appendPQExpBufferStr(q, "COPY (SELECT ");
2248 /* klugery to get rid of parens in column list */
2249 if (strlen(column_list) > 2)
2250 {
2251 appendPQExpBufferStr(q, column_list + 1);
2252 q->data[q->len - 1] = ' ';
2253 }
2254 else
2255 appendPQExpBufferStr(q, "* ");
2256
2257 appendPQExpBuffer(q, "FROM %s %s) TO stdout;",
2258 fmtQualifiedDumpable(tbinfo),
2259 tdinfo->filtercond ? tdinfo->filtercond : "");
2260 }
2261 else
2262 {
2263 appendPQExpBuffer(q, "COPY %s %s TO stdout;",
2264 fmtQualifiedDumpable(tbinfo),
2265 column_list);
2266 }
2268 PQclear(res);
2269 destroyPQExpBuffer(clistBuf);
2270
2271 for (;;)
2272 {
2273 ret = PQgetCopyData(conn, &copybuf, 0);
2274
2275 if (ret < 0)
2276 break; /* done or error */
2277
2278 if (copybuf)
2279 {
2280 WriteData(fout, copybuf, ret);
2282 }
2283
2284 /* ----------
2285 * THROTTLE:
2286 *
2287 * There was considerable discussion in late July, 2000 regarding
2288 * slowing down pg_dump when backing up large tables. Users with both
2289 * slow & fast (multi-processor) machines experienced performance
2290 * degradation when doing a backup.
2291 *
2292 * Initial attempts based on sleeping for a number of ms for each ms
2293 * of work were deemed too complex, then a simple 'sleep in each loop'
2294 * implementation was suggested. The latter failed because the loop
2295 * was too tight. Finally, the following was implemented:
2296 *
2297 * If throttle is non-zero, then
2298 * See how long since the last sleep.
2299 * Work out how long to sleep (based on ratio).
2300 * If sleep is more than 100ms, then
2301 * sleep
2302 * reset timer
2303 * EndIf
2304 * EndIf
2305 *
2306 * where the throttle value was the number of ms to sleep per ms of
2307 * work. The calculation was done in each loop.
2308 *
2309 * Most of the hard work is done in the backend, and this solution
2310 * still did not work particularly well: on slow machines, the ratio
2311 * was 50:1, and on medium paced machines, 1:1, and on fast
2312 * multi-processor machines, it had little or no effect, for reasons
2313 * that were unclear.
2314 *
2315 * Further discussion ensued, and the proposal was dropped.
2316 *
2317 * For those people who want this feature, it can be implemented using
2318 * gettimeofday in each loop, calculating the time since last sleep,
2319 * multiplying that by the sleep ratio, then if the result is more
2320 * than a preset 'minimum sleep time' (say 100ms), call the 'select'
2321 * function to sleep for a subsecond period ie.
2322 *
2323 * select(0, NULL, NULL, NULL, &tvi);
2324 *
2325 * This will return after the interval specified in the structure tvi.
2326 * Finally, call gettimeofday again to save the 'last sleep time'.
2327 * ----------
2328 */
2329 }
2330 archprintf(fout, "\\.\n\n\n");
2331
2332 if (ret == -2)
2333 {
2334 /* copy data transfer failed */
2335 pg_log_error("Dumping the contents of table \"%s\" failed: PQgetCopyData() failed.", classname);
2336 pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2337 pg_log_error_detail("Command was: %s", q->data);
2338 exit_nicely(1);
2339 }
2340
2341 /* Check command status and return to normal libpq state */
2342 res = PQgetResult(conn);
2344 {
2345 pg_log_error("Dumping the contents of table \"%s\" failed: PQgetResult() failed.", classname);
2346 pg_log_error_detail("Error message from server: %s", PQerrorMessage(conn));
2347 pg_log_error_detail("Command was: %s", q->data);
2348 exit_nicely(1);
2349 }
2350 PQclear(res);
2351
2352 /* Do this to ensure we've pumped libpq back to idle state */
2353 if (PQgetResult(conn) != NULL)
2354 pg_log_warning("unexpected extra results during COPY of table \"%s\"",
2355 classname);
2356
2358
2359 /* Revert back the setting */
2360 if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2361 set_restrict_relation_kind(fout, "view, foreign-table");
2362
2363 return 1;
2364}
2365
2366/*
2367 * Dump table data using INSERT commands.
2368 *
2369 * Caution: when we restore from an archive file direct to database, the
2370 * INSERT commands emitted by this function have to be parsed by
2371 * pg_backup_db.c's ExecuteSimpleCommands(), which will not handle comments,
2372 * E'' strings, or dollar-quoted strings. So don't emit anything like that.
2373 */
2374static int
2375dumpTableData_insert(Archive *fout, const void *dcontext)
2376{
2377 TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
2378 TableInfo *tbinfo = tdinfo->tdtable;
2379 DumpOptions *dopt = fout->dopt;
2381 PQExpBuffer insertStmt = NULL;
2382 char *attgenerated;
2383 PGresult *res;
2384 int nfields,
2385 i;
2386 int rows_per_statement = dopt->dump_inserts;
2387 int rows_this_statement = 0;
2388
2389 /* Temporary allows to access to foreign tables to dump data */
2390 if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2391 set_restrict_relation_kind(fout, "view");
2392
2393 /*
2394 * If we're going to emit INSERTs with column names, the most efficient
2395 * way to deal with generated columns is to exclude them entirely. For
2396 * INSERTs without column names, we have to emit DEFAULT rather than the
2397 * actual column value --- but we can save a few cycles by fetching nulls
2398 * rather than the uninteresting-to-us value.
2399 */
2400 attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
2401 appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
2402 nfields = 0;
2403 for (i = 0; i < tbinfo->numatts; i++)
2404 {
2405 if (tbinfo->attisdropped[i])
2406 continue;
2407 if (tbinfo->attgenerated[i] && dopt->column_inserts)
2408 continue;
2409 if (nfields > 0)
2410 appendPQExpBufferStr(q, ", ");
2411 if (tbinfo->attgenerated[i])
2412 appendPQExpBufferStr(q, "NULL");
2413 else
2414 appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
2415 attgenerated[nfields] = tbinfo->attgenerated[i];
2416 nfields++;
2417 }
2418 /* Servers before 9.4 will complain about zero-column SELECT */
2419 if (nfields == 0)
2420 appendPQExpBufferStr(q, "NULL");
2421 appendPQExpBuffer(q, " FROM ONLY %s",
2422 fmtQualifiedDumpable(tbinfo));
2423 if (tdinfo->filtercond)
2424 appendPQExpBuffer(q, " %s", tdinfo->filtercond);
2425
2426 ExecuteSqlStatement(fout, q->data);
2427
2428 while (1)
2429 {
2430 res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
2432
2433 /* cross-check field count, allowing for dummy NULL if any */
2434 if (nfields != PQnfields(res) &&
2435 !(nfields == 0 && PQnfields(res) == 1))
2436 pg_fatal("wrong number of fields retrieved from table \"%s\"",
2437 tbinfo->dobj.name);
2438
2439 /*
2440 * First time through, we build as much of the INSERT statement as
2441 * possible in "insertStmt", which we can then just print for each
2442 * statement. If the table happens to have zero dumpable columns then
2443 * this will be a complete statement, otherwise it will end in
2444 * "VALUES" and be ready to have the row's column values printed.
2445 */
2446 if (insertStmt == NULL)
2447 {
2448 TableInfo *targettab;
2449
2450 insertStmt = createPQExpBuffer();
2451
2452 /*
2453 * When load-via-partition-root is set or forced, get the root
2454 * table name for the partition table, so that we can reload data
2455 * through the root table.
2456 */
2457 if (tbinfo->ispartition &&
2458 (dopt->load_via_partition_root ||
2459 forcePartitionRootLoad(tbinfo)))
2460 targettab = getRootTableInfo(tbinfo);
2461 else
2462 targettab = tbinfo;
2463
2464 appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
2465 fmtQualifiedDumpable(targettab));
2466
2467 /* corner case for zero-column table */
2468 if (nfields == 0)
2469 {
2470 appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
2471 }
2472 else
2473 {
2474 /* append the list of column names if required */
2475 if (dopt->column_inserts)
2476 {
2477 appendPQExpBufferChar(insertStmt, '(');
2478 for (int field = 0; field < nfields; field++)
2479 {
2480 if (field > 0)
2481 appendPQExpBufferStr(insertStmt, ", ");
2482 appendPQExpBufferStr(insertStmt,
2483 fmtId(PQfname(res, field)));
2484 }
2485 appendPQExpBufferStr(insertStmt, ") ");
2486 }
2487
2488 if (tbinfo->needs_override)
2489 appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
2490
2491 appendPQExpBufferStr(insertStmt, "VALUES");
2492 }
2493 }
2494
2495 for (int tuple = 0; tuple < PQntuples(res); tuple++)
2496 {
2497 /* Write the INSERT if not in the middle of a multi-row INSERT. */
2498 if (rows_this_statement == 0)
2499 archputs(insertStmt->data, fout);
2500
2501 /*
2502 * If it is zero-column table then we've already written the
2503 * complete statement, which will mean we've disobeyed
2504 * --rows-per-insert when it's set greater than 1. We do support
2505 * a way to make this multi-row with: SELECT UNION ALL SELECT
2506 * UNION ALL ... but that's non-standard so we should avoid it
2507 * given that using INSERTs is mostly only ever needed for
2508 * cross-database exports.
2509 */
2510 if (nfields == 0)
2511 continue;
2512
2513 /* Emit a row heading */
2514 if (rows_per_statement == 1)
2515 archputs(" (", fout);
2516 else if (rows_this_statement > 0)
2517 archputs(",\n\t(", fout);
2518 else
2519 archputs("\n\t(", fout);
2520
2521 for (int field = 0; field < nfields; field++)
2522 {
2523 if (field > 0)
2524 archputs(", ", fout);
2525 if (attgenerated[field])
2526 {
2527 archputs("DEFAULT", fout);
2528 continue;
2529 }
2530 if (PQgetisnull(res, tuple, field))
2531 {
2532 archputs("NULL", fout);
2533 continue;
2534 }
2535
2536 /* XXX This code is partially duplicated in ruleutils.c */
2537 switch (PQftype(res, field))
2538 {
2539 case INT2OID:
2540 case INT4OID:
2541 case INT8OID:
2542 case OIDOID:
2543 case FLOAT4OID:
2544 case FLOAT8OID:
2545 case NUMERICOID:
2546 {
2547 /*
2548 * These types are printed without quotes unless
2549 * they contain values that aren't accepted by the
2550 * scanner unquoted (e.g., 'NaN'). Note that
2551 * strtod() and friends might accept NaN, so we
2552 * can't use that to test.
2553 *
2554 * In reality we only need to defend against
2555 * infinity and NaN, so we need not get too crazy
2556 * about pattern matching here.
2557 */
2558 const char *s = PQgetvalue(res, tuple, field);
2559
2560 if (strspn(s, "0123456789 +-eE.") == strlen(s))
2561 archputs(s, fout);
2562 else
2563 archprintf(fout, "'%s'", s);
2564 }
2565 break;
2566
2567 case BITOID:
2568 case VARBITOID:
2569 archprintf(fout, "B'%s'",
2570 PQgetvalue(res, tuple, field));
2571 break;
2572
2573 case BOOLOID:
2574 if (strcmp(PQgetvalue(res, tuple, field), "t") == 0)
2575 archputs("true", fout);
2576 else
2577 archputs("false", fout);
2578 break;
2579
2580 default:
2581 /* All other types are printed as string literals. */
2584 PQgetvalue(res, tuple, field),
2585 fout);
2586 archputs(q->data, fout);
2587 break;
2588 }
2589 }
2590
2591 /* Terminate the row ... */
2592 archputs(")", fout);
2593
2594 /* ... and the statement, if the target no. of rows is reached */
2595 if (++rows_this_statement >= rows_per_statement)
2596 {
2597 if (dopt->do_nothing)
2598 archputs(" ON CONFLICT DO NOTHING;\n", fout);
2599 else
2600 archputs(";\n", fout);
2601 /* Reset the row counter */
2602 rows_this_statement = 0;
2603 }
2604 }
2605
2606 if (PQntuples(res) <= 0)
2607 {
2608 PQclear(res);
2609 break;
2610 }
2611 PQclear(res);
2612 }
2613
2614 /* Terminate any statements that didn't make the row count. */
2615 if (rows_this_statement > 0)
2616 {
2617 if (dopt->do_nothing)
2618 archputs(" ON CONFLICT DO NOTHING;\n", fout);
2619 else
2620 archputs(";\n", fout);
2621 }
2622
2623 archputs("\n\n", fout);
2624
2625 ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
2626
2628 if (insertStmt != NULL)
2629 destroyPQExpBuffer(insertStmt);
2630 free(attgenerated);
2631
2632 /* Revert back the setting */
2633 if (tbinfo->relkind == RELKIND_FOREIGN_TABLE)
2634 set_restrict_relation_kind(fout, "view, foreign-table");
2635
2636 return 1;
2637}
2638
2639/*
2640 * getRootTableInfo:
2641 * get the root TableInfo for the given partition table.
2642 */
2643static TableInfo *
2645{
2646 TableInfo *parentTbinfo;
2647
2648 Assert(tbinfo->ispartition);
2649 Assert(tbinfo->numParents == 1);
2650
2651 parentTbinfo = tbinfo->parents[0];
2652 while (parentTbinfo->ispartition)
2653 {
2654 Assert(parentTbinfo->numParents == 1);
2655 parentTbinfo = parentTbinfo->parents[0];
2656 }
2657
2658 return parentTbinfo;
2659}
2660
2661/*
2662 * forcePartitionRootLoad
2663 * Check if we must force load_via_partition_root for this partition.
2664 *
2665 * This is required if any level of ancestral partitioned table has an
2666 * unsafe partitioning scheme.
2667 */
2668static bool
2670{
2671 TableInfo *parentTbinfo;
2672
2673 Assert(tbinfo->ispartition);
2674 Assert(tbinfo->numParents == 1);
2675
2676 parentTbinfo = tbinfo->parents[0];
2677 if (parentTbinfo->unsafe_partitions)
2678 return true;
2679 while (parentTbinfo->ispartition)
2680 {
2681 Assert(parentTbinfo->numParents == 1);
2682 parentTbinfo = parentTbinfo->parents[0];
2683 if (parentTbinfo->unsafe_partitions)
2684 return true;
2685 }
2686
2687 return false;
2688}
2689
2690/*
2691 * dumpTableData -
2692 * dump the contents of a single table
2693 *
2694 * Actually, this just makes an ArchiveEntry for the table contents.
2695 */
2696static void
2698{
2699 DumpOptions *dopt = fout->dopt;
2700 TableInfo *tbinfo = tdinfo->tdtable;
2701 PQExpBuffer copyBuf = createPQExpBuffer();
2702 PQExpBuffer clistBuf = createPQExpBuffer();
2703 DataDumperPtr dumpFn;
2704 char *tdDefn = NULL;
2705 char *copyStmt;
2706 const char *copyFrom;
2707
2708 /* We had better have loaded per-column details about this table */
2709 Assert(tbinfo->interesting);
2710
2711 /*
2712 * When load-via-partition-root is set or forced, get the root table name
2713 * for the partition table, so that we can reload data through the root
2714 * table. Then construct a comment to be inserted into the TOC entry's
2715 * defn field, so that such cases can be identified reliably.
2716 */
2717 if (tbinfo->ispartition &&
2718 (dopt->load_via_partition_root ||
2719 forcePartitionRootLoad(tbinfo)))
2720 {
2721 TableInfo *parentTbinfo;
2722
2723 parentTbinfo = getRootTableInfo(tbinfo);
2724 copyFrom = fmtQualifiedDumpable(parentTbinfo);
2725 printfPQExpBuffer(copyBuf, "-- load via partition root %s",
2726 copyFrom);
2727 tdDefn = pg_strdup(copyBuf->data);
2728 }
2729 else
2730 copyFrom = fmtQualifiedDumpable(tbinfo);
2731
2732 if (dopt->dump_inserts == 0)
2733 {
2734 /* Dump/restore using COPY */
2735 dumpFn = dumpTableData_copy;
2736 /* must use 2 steps here 'cause fmtId is nonreentrant */
2737 printfPQExpBuffer(copyBuf, "COPY %s ",
2738 copyFrom);
2739 appendPQExpBuffer(copyBuf, "%s FROM stdin;\n",
2740 fmtCopyColumnList(tbinfo, clistBuf));
2741 copyStmt = copyBuf->data;
2742 }
2743 else
2744 {
2745 /* Restore using INSERT */
2746 dumpFn = dumpTableData_insert;
2747 copyStmt = NULL;
2748 }
2749
2750 /*
2751 * Note: although the TableDataInfo is a full DumpableObject, we treat its
2752 * dependency on its table as "special" and pass it to ArchiveEntry now.
2753 * See comments for BuildArchiveDependencies.
2754 */
2755 if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2756 {
2757 TocEntry *te;
2758
2759 te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId,
2760 ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2761 .namespace = tbinfo->dobj.namespace->dobj.name,
2762 .owner = tbinfo->rolname,
2763 .description = "TABLE DATA",
2764 .section = SECTION_DATA,
2765 .createStmt = tdDefn,
2766 .copyStmt = copyStmt,
2767 .deps = &(tbinfo->dobj.dumpId),
2768 .nDeps = 1,
2769 .dumpFn = dumpFn,
2770 .dumpArg = tdinfo));
2771
2772 /*
2773 * Set the TocEntry's dataLength in case we are doing a parallel dump
2774 * and want to order dump jobs by table size. We choose to measure
2775 * dataLength in table pages (including TOAST pages) during dump, so
2776 * no scaling is needed.
2777 *
2778 * However, relpages is declared as "integer" in pg_class, and hence
2779 * also in TableInfo, but it's really BlockNumber a/k/a unsigned int.
2780 * Cast so that we get the right interpretation of table sizes
2781 * exceeding INT_MAX pages.
2782 */
2783 te->dataLength = (BlockNumber) tbinfo->relpages;
2784 te->dataLength += (BlockNumber) tbinfo->toastpages;
2785
2786 /*
2787 * If pgoff_t is only 32 bits wide, the above refinement is useless,
2788 * and instead we'd better worry about integer overflow. Clamp to
2789 * INT_MAX if the correct result exceeds that.
2790 */
2791 if (sizeof(te->dataLength) == 4 &&
2792 (tbinfo->relpages < 0 || tbinfo->toastpages < 0 ||
2793 te->dataLength < 0))
2794 te->dataLength = INT_MAX;
2795 }
2796
2797 destroyPQExpBuffer(copyBuf);
2798 destroyPQExpBuffer(clistBuf);
2799}
2800
2801/*
2802 * refreshMatViewData -
2803 * load or refresh the contents of a single materialized view
2804 *
2805 * Actually, this just makes an ArchiveEntry for the REFRESH MATERIALIZED VIEW
2806 * statement.
2807 */
2808static void
2810{
2811 TableInfo *tbinfo = tdinfo->tdtable;
2812 PQExpBuffer q;
2813
2814 /* If the materialized view is not flagged as populated, skip this. */
2815 if (!tbinfo->relispopulated)
2816 return;
2817
2818 q = createPQExpBuffer();
2819
2820 appendPQExpBuffer(q, "REFRESH MATERIALIZED VIEW %s;\n",
2821 fmtQualifiedDumpable(tbinfo));
2822
2823 if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA)
2824 ArchiveEntry(fout,
2825 tdinfo->dobj.catId, /* catalog ID */
2826 tdinfo->dobj.dumpId, /* dump ID */
2827 ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
2828 .namespace = tbinfo->dobj.namespace->dobj.name,
2829 .owner = tbinfo->rolname,
2830 .description = "MATERIALIZED VIEW DATA",
2831 .section = SECTION_POST_DATA,
2832 .createStmt = q->data,
2833 .deps = tdinfo->dobj.dependencies,
2834 .nDeps = tdinfo->dobj.nDeps));
2835
2837}
2838
2839/*
2840 * getTableData -
2841 * set up dumpable objects representing the contents of tables
2842 */
2843static void
2844getTableData(DumpOptions *dopt, TableInfo *tblinfo, int numTables, char relkind)
2845{
2846 int i;
2847
2848 for (i = 0; i < numTables; i++)
2849 {
2850 if (tblinfo[i].dobj.dump & DUMP_COMPONENT_DATA &&
2851 (!relkind || tblinfo[i].relkind == relkind))
2852 makeTableDataInfo(dopt, &(tblinfo[i]));
2853 }
2854}
2855
2856/*
2857 * Make a dumpable object for the data of this specific table
2858 *
2859 * Note: we make a TableDataInfo if and only if we are going to dump the
2860 * table data; the "dump" field in such objects isn't very interesting.
2861 */
2862static void
2864{
2865 TableDataInfo *tdinfo;
2866
2867 /*
2868 * Nothing to do if we already decided to dump the table. This will
2869 * happen for "config" tables.
2870 */
2871 if (tbinfo->dataObj != NULL)
2872 return;
2873
2874 /* Skip VIEWs (no data to dump) */
2875 if (tbinfo->relkind == RELKIND_VIEW)
2876 return;
2877 /* Skip FOREIGN TABLEs (no data to dump) unless requested explicitly */
2878 if (tbinfo->relkind == RELKIND_FOREIGN_TABLE &&
2881 tbinfo->foreign_server)))
2882 return;
2883 /* Skip partitioned tables (data in partitions) */
2884 if (tbinfo->relkind == RELKIND_PARTITIONED_TABLE)
2885 return;
2886
2887 /* Don't dump data in unlogged tables, if so requested */
2888 if (tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED &&
2890 return;
2891
2892 /* Check that the data is not explicitly excluded */
2894 tbinfo->dobj.catId.oid))
2895 return;
2896
2897 /* OK, let's dump it */
2898 tdinfo = (TableDataInfo *) pg_malloc(sizeof(TableDataInfo));
2899
2900 if (tbinfo->relkind == RELKIND_MATVIEW)
2902 else if (tbinfo->relkind == RELKIND_SEQUENCE)
2903 tdinfo->dobj.objType = DO_SEQUENCE_SET;
2904 else
2905 tdinfo->dobj.objType = DO_TABLE_DATA;
2906
2907 /*
2908 * Note: use tableoid 0 so that this object won't be mistaken for
2909 * something that pg_depend entries apply to.
2910 */
2911 tdinfo->dobj.catId.tableoid = 0;
2912 tdinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
2913 AssignDumpId(&tdinfo->dobj);
2914 tdinfo->dobj.name = tbinfo->dobj.name;
2915 tdinfo->dobj.namespace = tbinfo->dobj.namespace;
2916 tdinfo->tdtable = tbinfo;
2917 tdinfo->filtercond = NULL; /* might get set later */
2918 addObjectDependency(&tdinfo->dobj, tbinfo->dobj.dumpId);
2919
2920 /* A TableDataInfo contains data, of course */
2922
2923 tbinfo->dataObj = tdinfo;
2924
2925 /* Make sure that we'll collect per-column info for this table. */
2926 tbinfo->interesting = true;
2927}
2928
2929/*
2930 * The refresh for a materialized view must be dependent on the refresh for
2931 * any materialized view that this one is dependent on.
2932 *
2933 * This must be called after all the objects are created, but before they are
2934 * sorted.
2935 */
2936static void
2938{
2939 PQExpBuffer query;
2940 PGresult *res;
2941 int ntups,
2942 i;
2943 int i_classid,
2944 i_objid,
2945 i_refobjid;
2946
2947 /* No Mat Views before 9.3. */
2948 if (fout->remoteVersion < 90300)
2949 return;
2950
2951 query = createPQExpBuffer();
2952
2953 appendPQExpBufferStr(query, "WITH RECURSIVE w AS "
2954 "( "
2955 "SELECT d1.objid, d2.refobjid, c2.relkind AS refrelkind "
2956 "FROM pg_depend d1 "
2957 "JOIN pg_class c1 ON c1.oid = d1.objid "
2958 "AND c1.relkind = " CppAsString2(RELKIND_MATVIEW)
2959 " JOIN pg_rewrite r1 ON r1.ev_class = d1.objid "
2960 "JOIN pg_depend d2 ON d2.classid = 'pg_rewrite'::regclass "
2961 "AND d2.objid = r1.oid "
2962 "AND d2.refobjid <> d1.objid "
2963 "JOIN pg_class c2 ON c2.oid = d2.refobjid "
2964 "AND c2.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2965 CppAsString2(RELKIND_VIEW) ") "
2966 "WHERE d1.classid = 'pg_class'::regclass "
2967 "UNION "
2968 "SELECT w.objid, d3.refobjid, c3.relkind "
2969 "FROM w "
2970 "JOIN pg_rewrite r3 ON r3.ev_class = w.refobjid "
2971 "JOIN pg_depend d3 ON d3.classid = 'pg_rewrite'::regclass "
2972 "AND d3.objid = r3.oid "
2973 "AND d3.refobjid <> w.refobjid "
2974 "JOIN pg_class c3 ON c3.oid = d3.refobjid "
2975 "AND c3.relkind IN (" CppAsString2(RELKIND_MATVIEW) ","
2976 CppAsString2(RELKIND_VIEW) ") "
2977 ") "
2978 "SELECT 'pg_class'::regclass::oid AS classid, objid, refobjid "
2979 "FROM w "
2980 "WHERE refrelkind = " CppAsString2(RELKIND_MATVIEW));
2981
2982 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
2983
2984 ntups = PQntuples(res);
2985
2986 i_classid = PQfnumber(res, "classid");
2987 i_objid = PQfnumber(res, "objid");
2988 i_refobjid = PQfnumber(res, "refobjid");
2989
2990 for (i = 0; i < ntups; i++)
2991 {
2992 CatalogId objId;
2993 CatalogId refobjId;
2994 DumpableObject *dobj;
2995 DumpableObject *refdobj;
2996 TableInfo *tbinfo;
2997 TableInfo *reftbinfo;
2998
2999 objId.tableoid = atooid(PQgetvalue(res, i, i_classid));
3000 objId.oid = atooid(PQgetvalue(res, i, i_objid));
3001 refobjId.tableoid = objId.tableoid;
3002 refobjId.oid = atooid(PQgetvalue(res, i, i_refobjid));
3003
3004 dobj = findObjectByCatalogId(objId);
3005 if (dobj == NULL)
3006 continue;
3007
3008 Assert(dobj->objType == DO_TABLE);
3009 tbinfo = (TableInfo *) dobj;
3010 Assert(tbinfo->relkind == RELKIND_MATVIEW);
3011 dobj = (DumpableObject *) tbinfo->dataObj;
3012 if (dobj == NULL)
3013 continue;
3015
3016 refdobj = findObjectByCatalogId(refobjId);
3017 if (refdobj == NULL)
3018 continue;
3019
3020 Assert(refdobj->objType == DO_TABLE);
3021 reftbinfo = (TableInfo *) refdobj;
3022 Assert(reftbinfo->relkind == RELKIND_MATVIEW);
3023 refdobj = (DumpableObject *) reftbinfo->dataObj;
3024 if (refdobj == NULL)
3025 continue;
3026 Assert(refdobj->objType == DO_REFRESH_MATVIEW);
3027
3028 addObjectDependency(dobj, refdobj->dumpId);
3029
3030 if (!reftbinfo->relispopulated)
3031 tbinfo->relispopulated = false;
3032 }
3033
3034 PQclear(res);
3035
3036 destroyPQExpBuffer(query);
3037}
3038
3039/*
3040 * getTableDataFKConstraints -
3041 * add dump-order dependencies reflecting foreign key constraints
3042 *
3043 * This code is executed only in a data-only dump --- in schema+data dumps
3044 * we handle foreign key issues by not creating the FK constraints until
3045 * after the data is loaded. In a data-only dump, however, we want to
3046 * order the table data objects in such a way that a table's referenced
3047 * tables are restored first. (In the presence of circular references or
3048 * self-references this may be impossible; we'll detect and complain about
3049 * that during the dependency sorting step.)
3050 */
3051static void
3053{
3054 DumpableObject **dobjs;
3055 int numObjs;
3056 int i;
3057
3058 /* Search through all the dumpable objects for FK constraints */
3059 getDumpableObjects(&dobjs, &numObjs);
3060 for (i = 0; i < numObjs; i++)
3061 {
3062 if (dobjs[i]->objType == DO_FK_CONSTRAINT)
3063 {
3064 ConstraintInfo *cinfo = (ConstraintInfo *) dobjs[i];
3065 TableInfo *ftable;
3066
3067 /* Not interesting unless both tables are to be dumped */
3068 if (cinfo->contable == NULL ||
3069 cinfo->contable->dataObj == NULL)
3070 continue;
3071 ftable = findTableByOid(cinfo->confrelid);
3072 if (ftable == NULL ||
3073 ftable->dataObj == NULL)
3074 continue;
3075
3076 /*
3077 * Okay, make referencing table's TABLE_DATA object depend on the
3078 * referenced table's TABLE_DATA object.
3079 */
3081 ftable->dataObj->dobj.dumpId);
3082 }
3083 }
3084 free(dobjs);
3085}
3086
3087
3088/*
3089 * dumpDatabase:
3090 * dump the database definition
3091 */
3092static void
3094{
3095 DumpOptions *dopt = fout->dopt;
3097 PQExpBuffer delQry = createPQExpBuffer();
3098 PQExpBuffer creaQry = createPQExpBuffer();
3099 PQExpBuffer labelq = createPQExpBuffer();
3100 PGconn *conn = GetConnection(fout);
3101 PGresult *res;
3102 int i_tableoid,
3103 i_oid,
3104 i_datname,
3105 i_datdba,
3106 i_encoding,
3107 i_datlocprovider,
3108 i_collate,
3109 i_ctype,
3110 i_datlocale,
3111 i_daticurules,
3112 i_frozenxid,
3113 i_minmxid,
3114 i_datacl,
3115 i_acldefault,
3116 i_datistemplate,
3117 i_datconnlimit,
3118 i_datcollversion,
3119 i_tablespace;
3120 CatalogId dbCatId;
3121 DumpId dbDumpId;
3122 DumpableAcl dbdacl;
3123 const char *datname,
3124 *dba,
3125 *encoding,
3127 *collate,
3128 *ctype,
3129 *locale,
3130 *icurules,
3132 *datconnlimit,
3133 *tablespace;
3134 uint32 frozenxid,
3135 minmxid;
3136 char *qdatname;
3137
3138 pg_log_info("saving database definition");
3139
3140 /*
3141 * Fetch the database-level properties for this database.
3142 */
3143 appendPQExpBufferStr(dbQry, "SELECT tableoid, oid, datname, "
3144 "datdba, "
3145 "pg_encoding_to_char(encoding) AS encoding, "
3146 "datcollate, datctype, datfrozenxid, "
3147 "datacl, acldefault('d', datdba) AS acldefault, "
3148 "datistemplate, datconnlimit, ");
3149 if (fout->remoteVersion >= 90300)
3150 appendPQExpBufferStr(dbQry, "datminmxid, ");
3151 else
3152 appendPQExpBufferStr(dbQry, "0 AS datminmxid, ");
3153 if (fout->remoteVersion >= 170000)
3154 appendPQExpBufferStr(dbQry, "datlocprovider, datlocale, datcollversion, ");
3155 else if (fout->remoteVersion >= 150000)
3156 appendPQExpBufferStr(dbQry, "datlocprovider, daticulocale AS datlocale, datcollversion, ");
3157 else
3158 appendPQExpBufferStr(dbQry, "'c' AS datlocprovider, NULL AS datlocale, NULL AS datcollversion, ");
3159 if (fout->remoteVersion >= 160000)
3160 appendPQExpBufferStr(dbQry, "daticurules, ");
3161 else
3162 appendPQExpBufferStr(dbQry, "NULL AS daticurules, ");
3164 "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
3165 "shobj_description(oid, 'pg_database') AS description "
3166 "FROM pg_database "
3167 "WHERE datname = current_database()");
3168
3169 res = ExecuteSqlQueryForSingleRow(fout, dbQry->data);
3170
3171 i_tableoid = PQfnumber(res, "tableoid");
3172 i_oid = PQfnumber(res, "oid");
3173 i_datname = PQfnumber(res, "datname");
3174 i_datdba = PQfnumber(res, "datdba");
3175 i_encoding = PQfnumber(res, "encoding");
3176 i_datlocprovider = PQfnumber(res, "datlocprovider");
3177 i_collate = PQfnumber(res, "datcollate");
3178 i_ctype = PQfnumber(res, "datctype");
3179 i_datlocale = PQfnumber(res, "datlocale");
3180 i_daticurules = PQfnumber(res, "daticurules");
3181 i_frozenxid = PQfnumber(res, "datfrozenxid");
3182 i_minmxid = PQfnumber(res, "datminmxid");
3183 i_datacl = PQfnumber(res, "datacl");
3184 i_acldefault = PQfnumber(res, "acldefault");
3185 i_datistemplate = PQfnumber(res, "datistemplate");
3186 i_datconnlimit = PQfnumber(res, "datconnlimit");
3187 i_datcollversion = PQfnumber(res, "datcollversion");
3188 i_tablespace = PQfnumber(res, "tablespace");
3189
3190 dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
3191 dbCatId.oid = atooid(PQgetvalue(res, 0, i_oid));
3192 datname = PQgetvalue(res, 0, i_datname);
3193 dba = getRoleName(PQgetvalue(res, 0, i_datdba));
3194 encoding = PQgetvalue(res, 0, i_encoding);
3195 datlocprovider = PQgetvalue(res, 0, i_datlocprovider);
3196 collate = PQgetvalue(res, 0, i_collate);
3197 ctype = PQgetvalue(res, 0, i_ctype);
3198 if (!PQgetisnull(res, 0, i_datlocale))
3199 locale = PQgetvalue(res, 0, i_datlocale);
3200 else
3201 locale = NULL;
3202 if (!PQgetisnull(res, 0, i_daticurules))
3203 icurules = PQgetvalue(res, 0, i_daticurules);
3204 else
3205 icurules = NULL;
3206 frozenxid = atooid(PQgetvalue(res, 0, i_frozenxid));
3207 minmxid = atooid(PQgetvalue(res, 0, i_minmxid));
3208 dbdacl.acl = PQgetvalue(res, 0, i_datacl);
3209 dbdacl.acldefault = PQgetvalue(res, 0, i_acldefault);
3210 datistemplate = PQgetvalue(res, 0, i_datistemplate);
3211 datconnlimit = PQgetvalue(res, 0, i_datconnlimit);
3212 tablespace = PQgetvalue(res, 0, i_tablespace);
3213
3214 qdatname = pg_strdup(fmtId(datname));
3215
3216 /*
3217 * Prepare the CREATE DATABASE command. We must specify OID (if we want
3218 * to preserve that), as well as the encoding, locale, and tablespace
3219 * since those can't be altered later. Other DB properties are left to
3220 * the DATABASE PROPERTIES entry, so that they can be applied after
3221 * reconnecting to the target DB.
3222 *
3223 * For binary upgrade, we use the FILE_COPY strategy because testing has
3224 * shown it to be faster. When the server is in binary upgrade mode, it
3225 * will also skip the checkpoints this strategy ordinarily performs.
3226 */
3227 if (dopt->binary_upgrade)
3228 {
3229 appendPQExpBuffer(creaQry,
3230 "CREATE DATABASE %s WITH TEMPLATE = template0 "
3231 "OID = %u STRATEGY = FILE_COPY",
3232 qdatname, dbCatId.oid);
3233 }
3234 else
3235 {
3236 appendPQExpBuffer(creaQry, "CREATE DATABASE %s WITH TEMPLATE = template0",
3237 qdatname);
3238 }
3239 if (strlen(encoding) > 0)
3240 {
3241 appendPQExpBufferStr(creaQry, " ENCODING = ");
3242 appendStringLiteralAH(creaQry, encoding, fout);
3243 }
3244
3245 appendPQExpBufferStr(creaQry, " LOCALE_PROVIDER = ");
3246 if (datlocprovider[0] == 'b')
3247 appendPQExpBufferStr(creaQry, "builtin");
3248 else if (datlocprovider[0] == 'c')
3249 appendPQExpBufferStr(creaQry, "libc");
3250 else if (datlocprovider[0] == 'i')
3251 appendPQExpBufferStr(creaQry, "icu");
3252 else
3253 pg_fatal("unrecognized locale provider: %s",
3255
3256 if (strlen(collate) > 0 && strcmp(collate, ctype) == 0)
3257 {
3258 appendPQExpBufferStr(creaQry, " LOCALE = ");
3259 appendStringLiteralAH(creaQry, collate, fout);
3260 }
3261 else
3262 {
3263 if (strlen(collate) > 0)
3264 {
3265 appendPQExpBufferStr(creaQry, " LC_COLLATE = ");
3266 appendStringLiteralAH(creaQry, collate, fout);
3267 }
3268 if (strlen(ctype) > 0)
3269 {
3270 appendPQExpBufferStr(creaQry, " LC_CTYPE = ");
3271 appendStringLiteralAH(creaQry, ctype, fout);
3272 }
3273 }
3274 if (locale)
3275 {
3276 if (datlocprovider[0] == 'b')
3277 appendPQExpBufferStr(creaQry, " BUILTIN_LOCALE = ");
3278 else
3279 appendPQExpBufferStr(creaQry, " ICU_LOCALE = ");
3280
3281 appendStringLiteralAH(creaQry, locale, fout);
3282 }
3283
3284 if (icurules)
3285 {
3286 appendPQExpBufferStr(creaQry, " ICU_RULES = ");
3287 appendStringLiteralAH(creaQry, icurules, fout);
3288 }
3289
3290 /*
3291 * For binary upgrade, carry over the collation version. For normal
3292 * dump/restore, omit the version, so that it is computed upon restore.
3293 */
3294 if (dopt->binary_upgrade)
3295 {
3296 if (!PQgetisnull(res, 0, i_datcollversion))
3297 {
3298 appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
3299 appendStringLiteralAH(creaQry,
3300 PQgetvalue(res, 0, i_datcollversion),
3301 fout);
3302 }
3303 }
3304
3305 /*
3306 * Note: looking at dopt->outputNoTablespaces here is completely the wrong
3307 * thing; the decision whether to specify a tablespace should be left till
3308 * pg_restore, so that pg_restore --no-tablespaces applies. Ideally we'd
3309 * label the DATABASE entry with the tablespace and let the normal
3310 * tablespace selection logic work ... but CREATE DATABASE doesn't pay
3311 * attention to default_tablespace, so that won't work.
3312 */
3313 if (strlen(tablespace) > 0 && strcmp(tablespace, "pg_default") != 0 &&
3314 !dopt->outputNoTablespaces)
3315 appendPQExpBuffer(creaQry, " TABLESPACE = %s",
3316 fmtId(tablespace));
3317 appendPQExpBufferStr(creaQry, ";\n");
3318
3319 appendPQExpBuffer(delQry, "DROP DATABASE %s;\n",
3320 qdatname);
3321
3322 dbDumpId = createDumpId();
3323
3324 ArchiveEntry(fout,
3325 dbCatId, /* catalog ID */
3326 dbDumpId, /* dump ID */
3327 ARCHIVE_OPTS(.tag = datname,
3328 .owner = dba,
3329 .description = "DATABASE",
3330 .section = SECTION_PRE_DATA,
3331 .createStmt = creaQry->data,
3332 .dropStmt = delQry->data));
3333
3334 /* Compute correct tag for archive entry */
3335 appendPQExpBuffer(labelq, "DATABASE %s", qdatname);
3336
3337 /* Dump DB comment if any */
3338 {
3339 /*
3340 * 8.2 and up keep comments on shared objects in a shared table, so we
3341 * cannot use the dumpComment() code used for other database objects.
3342 * Be careful that the ArchiveEntry parameters match that function.
3343 */
3344 char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
3345
3346 if (comment && *comment && !dopt->no_comments)
3347 {
3348 resetPQExpBuffer(dbQry);
3349
3350 /*
3351 * Generates warning when loaded into a differently-named
3352 * database.
3353 */
3354 appendPQExpBuffer(dbQry, "COMMENT ON DATABASE %s IS ", qdatname);
3355 appendStringLiteralAH(dbQry, comment, fout);
3356 appendPQExpBufferStr(dbQry, ";\n");
3357
3359 ARCHIVE_OPTS(.tag = labelq->data,
3360 .owner = dba,
3361 .description = "COMMENT",
3362 .section = SECTION_NONE,
3363 .createStmt = dbQry->data,
3364 .deps = &dbDumpId,
3365 .nDeps = 1));
3366 }
3367 }
3368
3369 /* Dump DB security label, if enabled */
3370 if (!dopt->no_security_labels)
3371 {
3372 PGresult *shres;
3373 PQExpBuffer seclabelQry;
3374
3375 seclabelQry = createPQExpBuffer();
3376
3377 buildShSecLabelQuery("pg_database", dbCatId.oid, seclabelQry);
3378 shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
3379 resetPQExpBuffer(seclabelQry);
3380 emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
3381 if (seclabelQry->len > 0)
3383 ARCHIVE_OPTS(.tag = labelq->data,
3384 .owner = dba,
3385 .description = "SECURITY LABEL",
3386 .section = SECTION_NONE,
3387 .createStmt = seclabelQry->data,
3388 .deps = &dbDumpId,
3389 .nDeps = 1));
3390 destroyPQExpBuffer(seclabelQry);
3391 PQclear(shres);
3392 }
3393
3394 /*
3395 * Dump ACL if any. Note that we do not support initial privileges
3396 * (pg_init_privs) on databases.
3397 */
3398 dbdacl.privtype = 0;
3399 dbdacl.initprivs = NULL;
3400
3401 dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
3402 qdatname, NULL, NULL,
3403 NULL, dba, &dbdacl);
3404
3405 /*
3406 * Now construct a DATABASE PROPERTIES archive entry to restore any
3407 * non-default database-level properties. (The reason this must be
3408 * separate is that we cannot put any additional commands into the TOC
3409 * entry that has CREATE DATABASE. pg_restore would execute such a group
3410 * in an implicit transaction block, and the backend won't allow CREATE
3411 * DATABASE in that context.)
3412 */
3413 resetPQExpBuffer(creaQry);
3414 resetPQExpBuffer(delQry);
3415
3416 if (strlen(datconnlimit) > 0 && strcmp(datconnlimit, "-1") != 0)
3417 appendPQExpBuffer(creaQry, "ALTER DATABASE %s CONNECTION LIMIT = %s;\n",
3418 qdatname, datconnlimit);
3419
3420 if (strcmp(datistemplate, "t") == 0)
3421 {
3422 appendPQExpBuffer(creaQry, "ALTER DATABASE %s IS_TEMPLATE = true;\n",
3423 qdatname);
3424
3425 /*
3426 * The backend won't accept DROP DATABASE on a template database. We
3427 * can deal with that by removing the template marking before the DROP
3428 * gets issued. We'd prefer to use ALTER DATABASE IF EXISTS here, but
3429 * since no such command is currently supported, fake it with a direct
3430 * UPDATE on pg_database.
3431 */
3432 appendPQExpBufferStr(delQry, "UPDATE pg_catalog.pg_database "
3433 "SET datistemplate = false WHERE datname = ");
3434 appendStringLiteralAH(delQry, datname, fout);
3435 appendPQExpBufferStr(delQry, ";\n");
3436 }
3437
3438 /*
3439 * We do not restore pg_database.dathasloginevt because it is set
3440 * automatically on login event trigger creation.
3441 */
3442
3443 /* Add database-specific SET options */
3444 dumpDatabaseConfig(fout, creaQry, datname, dbCatId.oid);
3445
3446 /*
3447 * We stick this binary-upgrade query into the DATABASE PROPERTIES archive
3448 * entry, too, for lack of a better place.
3449 */
3450 if (dopt->binary_upgrade)
3451 {
3452 appendPQExpBufferStr(creaQry, "\n-- For binary upgrade, set datfrozenxid and datminmxid.\n");
3453 appendPQExpBuffer(creaQry, "UPDATE pg_catalog.pg_database\n"
3454 "SET datfrozenxid = '%u', datminmxid = '%u'\n"
3455 "WHERE datname = ",
3456 frozenxid, minmxid);
3457 appendStringLiteralAH(creaQry, datname, fout);
3458 appendPQExpBufferStr(creaQry, ";\n");
3459 }
3460
3461 if (creaQry->len > 0)
3463 ARCHIVE_OPTS(.tag = datname,
3464 .owner = dba,
3465 .description = "DATABASE PROPERTIES",
3466 .section = SECTION_PRE_DATA,
3467 .createStmt = creaQry->data,
3468 .dropStmt = delQry->data,
3469 .deps = &dbDumpId));
3470
3471 /*
3472 * pg_largeobject comes from the old system intact, so set its
3473 * relfrozenxids, relminmxids and relfilenode.
3474 */
3475 if (dopt->binary_upgrade)
3476 {
3477 PGresult *lo_res;
3478 PQExpBuffer loFrozenQry = createPQExpBuffer();
3479 PQExpBuffer loOutQry = createPQExpBuffer();
3480 PQExpBuffer loHorizonQry = createPQExpBuffer();
3481 int ii_relfrozenxid,
3482 ii_relfilenode,
3483 ii_oid,
3484 ii_relminmxid;
3485
3486 /*
3487 * pg_largeobject
3488 */
3489 if (fout->remoteVersion >= 90300)
3490 appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, relminmxid, relfilenode, oid\n"
3491 "FROM pg_catalog.pg_class\n"
3492 "WHERE oid IN (%u, %u);\n",
3493 LargeObjectRelationId, LargeObjectLOidPNIndexId);
3494 else
3495 appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid, 0 AS relminmxid, relfilenode, oid\n"
3496 "FROM pg_catalog.pg_class\n"
3497 "WHERE oid IN (%u, %u);\n",
3498 LargeObjectRelationId, LargeObjectLOidPNIndexId);
3499
3500 lo_res = ExecuteSqlQuery(fout, loFrozenQry->data, PGRES_TUPLES_OK);
3501
3502 ii_relfrozenxid = PQfnumber(lo_res, "relfrozenxid");
3503 ii_relminmxid = PQfnumber(lo_res, "relminmxid");
3504 ii_relfilenode = PQfnumber(lo_res, "relfilenode");
3505 ii_oid = PQfnumber(lo_res, "oid");
3506
3507 appendPQExpBufferStr(loHorizonQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid and relminmxid\n");
3508 appendPQExpBufferStr(loOutQry, "\n-- For binary upgrade, preserve pg_largeobject and index relfilenodes\n");
3509 for (int i = 0; i < PQntuples(lo_res); ++i)
3510 {
3511 Oid oid;
3512 RelFileNumber relfilenumber;
3513
3514 appendPQExpBuffer(loHorizonQry, "UPDATE pg_catalog.pg_class\n"
3515 "SET relfrozenxid = '%u', relminmxid = '%u'\n"
3516 "WHERE oid = %u;\n",
3517 atooid(PQgetvalue(lo_res, i, ii_relfrozenxid)),
3518 atooid(PQgetvalue(lo_res, i, ii_relminmxid)),
3519 atooid(PQgetvalue(lo_res, i, ii_oid)));
3520
3521 oid = atooid(PQgetvalue(lo_res, i, ii_oid));
3522 relfilenumber = atooid(PQgetvalue(lo_res, i, ii_relfilenode));
3523
3524 if (oid == LargeObjectRelationId)
3525 appendPQExpBuffer(loOutQry,
3526 "SELECT pg_catalog.binary_upgrade_set_next_heap_relfilenode('%u'::pg_catalog.oid);\n",
3527 relfilenumber);
3528 else if (oid == LargeObjectLOidPNIndexId)
3529 appendPQExpBuffer(loOutQry,
3530 "SELECT pg_catalog.binary_upgrade_set_next_index_relfilenode('%u'::pg_catalog.oid);\n",
3531 relfilenumber);
3532 }
3533
3534 appendPQExpBufferStr(loOutQry,
3535 "TRUNCATE pg_catalog.pg_largeobject;\n");
3536 appendPQExpBufferStr(loOutQry, loHorizonQry->data);
3537
3539 ARCHIVE_OPTS(.tag = "pg_largeobject",
3540 .description = "pg_largeobject",
3541 .section = SECTION_PRE_DATA,
3542 .createStmt = loOutQry->data));
3543
3544 PQclear(lo_res);
3545
3546 destroyPQExpBuffer(loFrozenQry);
3547 destroyPQExpBuffer(loHorizonQry);
3548 destroyPQExpBuffer(loOutQry);
3549 }
3550
3551 PQclear(res);
3552
3553 free(qdatname);
3554 destroyPQExpBuffer(dbQry);
3555 destroyPQExpBuffer(delQry);
3556 destroyPQExpBuffer(creaQry);
3557 destroyPQExpBuffer(labelq);
3558}
3559
3560/*
3561 * Collect any database-specific or role-and-database-specific SET options
3562 * for this database, and append them to outbuf.
3563 */
3564static void
3566 const char *dbname, Oid dboid)
3567{
3568 PGconn *conn = GetConnection(AH);
3570 PGresult *res;
3571
3572 /* First collect database-specific options */
3573 printfPQExpBuffer(buf, "SELECT unnest(setconfig) FROM pg_db_role_setting "
3574 "WHERE setrole = 0 AND setdatabase = '%u'::oid",
3575 dboid);
3576
3577 res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3578
3579 for (int i = 0; i < PQntuples(res); i++)
3581 "DATABASE", dbname, NULL, NULL,
3582 outbuf);
3583
3584 PQclear(res);
3585
3586 /* Now look for role-and-database-specific options */
3587 printfPQExpBuffer(buf, "SELECT rolname, unnest(setconfig) "
3588 "FROM pg_db_role_setting s, pg_roles r "
3589 "WHERE setrole = r.oid AND setdatabase = '%u'::oid",
3590 dboid);
3591
3592 res = ExecuteSqlQuery(AH, buf->data, PGRES_TUPLES_OK);
3593
3594 for (int i = 0; i < PQntuples(res); i++)
3596 "ROLE", PQgetvalue(res, i, 0),
3597 "DATABASE", dbname,
3598 outbuf);
3599
3600 PQclear(res);
3601
3603}
3604
3605/*
3606 * dumpEncoding: put the correct encoding into the archive
3607 */
3608static void
3610{
3611 const char *encname = pg_encoding_to_char(AH->encoding);
3613
3614 pg_log_info("saving encoding = %s", encname);
3615
3616 appendPQExpBufferStr(qry, "SET client_encoding = ");
3617 appendStringLiteralAH(qry, encname, AH);
3618 appendPQExpBufferStr(qry, ";\n");
3619
3621 ARCHIVE_OPTS(.tag = "ENCODING",
3622 .description = "ENCODING",
3623 .section = SECTION_PRE_DATA,
3624 .createStmt = qry->data));
3625
3626 destroyPQExpBuffer(qry);
3627}
3628
3629
3630/*
3631 * dumpStdStrings: put the correct escape string behavior into the archive
3632 */
3633static void
3635{
3636 const char *stdstrings = AH->std_strings ? "on" : "off";
3638
3639 pg_log_info("saving \"standard_conforming_strings = %s\"",
3640 stdstrings);
3641
3642 appendPQExpBuffer(qry, "SET standard_conforming_strings = '%s';\n",
3643 stdstrings);
3644
3646 ARCHIVE_OPTS(.tag = "STDSTRINGS",
3647 .description = "STDSTRINGS",
3648 .section = SECTION_PRE_DATA,
3649 .createStmt = qry->data));
3650
3651 destroyPQExpBuffer(qry);
3652}
3653
3654/*
3655 * dumpSearchPath: record the active search_path in the archive
3656 */
3657static void
3659{
3662 PGresult *res;
3663 char **schemanames = NULL;
3664 int nschemanames = 0;
3665 int i;
3666
3667 /*
3668 * We use the result of current_schemas(), not the search_path GUC,
3669 * because that might contain wildcards such as "$user", which won't
3670 * necessarily have the same value during restore. Also, this way avoids
3671 * listing schemas that may appear in search_path but not actually exist,
3672 * which seems like a prudent exclusion.
3673 */
3675 "SELECT pg_catalog.current_schemas(false)");
3676
3677 if (!parsePGArray(PQgetvalue(res, 0, 0), &schemanames, &nschemanames))
3678 pg_fatal("could not parse result of current_schemas()");
3679
3680 /*
3681 * We use set_config(), not a simple "SET search_path" command, because
3682 * the latter has less-clean behavior if the search path is empty. While
3683 * that's likely to get fixed at some point, it seems like a good idea to
3684 * be as backwards-compatible as possible in what we put into archives.
3685 */
3686 for (i = 0; i < nschemanames; i++)
3687 {
3688 if (i > 0)
3689 appendPQExpBufferStr(path, ", ");
3690 appendPQExpBufferStr(path, fmtId(schemanames[i]));
3691 }
3692
3693 appendPQExpBufferStr(qry, "SELECT pg_catalog.set_config('search_path', ");
3694 appendStringLiteralAH(qry, path->data, AH);
3695 appendPQExpBufferStr(qry, ", false);\n");
3696
3697 pg_log_info("saving \"search_path = %s\"", path->data);
3698
3700 ARCHIVE_OPTS(.tag = "SEARCHPATH",
3701 .description = "SEARCHPATH",
3702 .section = SECTION_PRE_DATA,
3703 .createStmt = qry->data));
3704
3705 /* Also save it in AH->searchpath, in case we're doing plain text dump */
3706 AH->searchpath = pg_strdup(qry->data);
3707
3708 free(schemanames);
3709 PQclear(res);
3710 destroyPQExpBuffer(qry);
3711 destroyPQExpBuffer(path);
3712}
3713
3714
3715/*
3716 * getLOs:
3717 * Collect schema-level data about large objects
3718 */
3719static void
3721{
3722 DumpOptions *dopt = fout->dopt;
3724 PGresult *res;
3725 int ntups;
3726 int i;
3727 int n;
3728 int i_oid;
3729 int i_lomowner;
3730 int i_lomacl;
3731 int i_acldefault;
3732
3733 pg_log_info("reading large objects");
3734
3735 /*
3736 * Fetch LO OIDs and owner/ACL data. Order the data so that all the blobs
3737 * with the same owner/ACL appear together.
3738 */
3740 "SELECT oid, lomowner, lomacl, "
3741 "acldefault('L', lomowner) AS acldefault "
3742 "FROM pg_largeobject_metadata "
3743 "ORDER BY lomowner, lomacl::pg_catalog.text, oid");
3744
3745 res = ExecuteSqlQuery(fout, loQry->data, PGRES_TUPLES_OK);
3746
3747 i_oid = PQfnumber(res, "oid");
3748 i_lomowner = PQfnumber(res, "lomowner");
3749 i_lomacl = PQfnumber(res, "lomacl");
3750 i_acldefault = PQfnumber(res, "acldefault");
3751
3752 ntups = PQntuples(res);
3753
3754 /*
3755 * Group the blobs into suitably-sized groups that have the same owner and
3756 * ACL setting, and build a metadata and a data DumpableObject for each
3757 * group. (If we supported initprivs for blobs, we'd have to insist that
3758 * groups also share initprivs settings, since the DumpableObject only has
3759 * room for one.) i is the index of the first tuple in the current group,
3760 * and n is the number of tuples we include in the group.
3761 */
3762 for (i = 0; i < ntups; i += n)
3763 {
3764 Oid thisoid = atooid(PQgetvalue(res, i, i_oid));
3765 char *thisowner = PQgetvalue(res, i, i_lomowner);
3766 char *thisacl = PQgetvalue(res, i, i_lomacl);
3767 LoInfo *loinfo;
3768 DumpableObject *lodata;
3769 char namebuf[64];
3770
3771 /* Scan to find first tuple not to be included in group */
3772 n = 1;
3773 while (n < MAX_BLOBS_PER_ARCHIVE_ENTRY && i + n < ntups)
3774 {
3775 if (strcmp(thisowner, PQgetvalue(res, i + n, i_lomowner)) != 0 ||
3776 strcmp(thisacl, PQgetvalue(res, i + n, i_lomacl)) != 0)
3777 break;
3778 n++;
3779 }
3780
3781 /* Build the metadata DumpableObject */
3782 loinfo = (LoInfo *) pg_malloc(offsetof(LoInfo, looids) + n * sizeof(Oid));
3783
3784 loinfo->dobj.objType = DO_LARGE_OBJECT;
3785 loinfo->dobj.catId.tableoid = LargeObjectRelationId;
3786 loinfo->dobj.catId.oid = thisoid;
3787 AssignDumpId(&loinfo->dobj);
3788
3789 if (n > 1)
3790 snprintf(namebuf, sizeof(namebuf), "%u..%u", thisoid,
3791 atooid(PQgetvalue(res, i + n - 1, i_oid)));
3792 else
3793 snprintf(namebuf, sizeof(namebuf), "%u", thisoid);
3794 loinfo->dobj.name = pg_strdup(namebuf);
3795 loinfo->dacl.acl = pg_strdup(thisacl);
3796 loinfo->dacl.acldefault = pg_strdup(PQgetvalue(res, i, i_acldefault));
3797 loinfo->dacl.privtype = 0;
3798 loinfo->dacl.initprivs = NULL;
3799 loinfo->rolname = getRoleName(thisowner);
3800 loinfo->numlos = n;
3801 loinfo->looids[0] = thisoid;
3802 /* Collect OIDs of the remaining blobs in this group */
3803 for (int k = 1; k < n; k++)
3804 {
3805 CatalogId extraID;
3806
3807 loinfo->looids[k] = atooid(PQgetvalue(res, i + k, i_oid));
3808
3809 /* Make sure we can look up loinfo by any of the blobs' OIDs */
3810 extraID.tableoid = LargeObjectRelationId;
3811 extraID.oid = loinfo->looids[k];
3812 recordAdditionalCatalogID(extraID, &loinfo->dobj);
3813 }
3814
3815 /* LOs have data */
3817
3818 /* Mark whether LO group has a non-empty ACL */
3819 if (!PQgetisnull(res, i, i_lomacl))
3821
3822 /*
3823 * In binary-upgrade mode for LOs, we do *not* dump out the LO data,
3824 * as it will be copied by pg_upgrade, which simply copies the
3825 * pg_largeobject table. We *do* however dump out anything but the
3826 * data, as pg_upgrade copies just pg_largeobject, but not
3827 * pg_largeobject_metadata, after the dump is restored.
3828 */
3829 if (dopt->binary_upgrade)
3830 loinfo->dobj.dump &= ~DUMP_COMPONENT_DATA;
3831
3832 /*
3833 * Create a "BLOBS" data item for the group, too. This is just a
3834 * placeholder for sorting; it carries no data now.
3835 */
3836 lodata = (DumpableObject *) pg_malloc(sizeof(DumpableObject));
3837 lodata->objType = DO_LARGE_OBJECT_DATA;
3838 lodata->catId = nilCatalogId;
3839 AssignDumpId(lodata);
3840 lodata->name = pg_strdup(namebuf);
3842 /* Set up explicit dependency from data to metadata */
3843 lodata->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
3844 lodata->dependencies[0] = loinfo->dobj.dumpId;
3845 lodata->nDeps = lodata->allocDeps = 1;
3846 }
3847
3848 PQclear(res);
3849 destroyPQExpBuffer(loQry);
3850}
3851
3852/*
3853 * dumpLO
3854 *
3855 * dump the definition (metadata) of the given large object group
3856 */
3857static void
3858dumpLO(Archive *fout, const LoInfo *loinfo)
3859{
3860 PQExpBuffer cquery = createPQExpBuffer();
3861
3862 /*
3863 * The "definition" is just a newline-separated list of OIDs. We need to
3864 * put something into the dropStmt too, but it can just be a comment.
3865 */
3866 for (int i = 0; i < loinfo->numlos; i++)
3867 appendPQExpBuffer(cquery, "%u\n", loinfo->looids[i]);
3868
3869 if (loinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
3870 ArchiveEntry(fout, loinfo->dobj.catId, loinfo->dobj.dumpId,
3871 ARCHIVE_OPTS(.tag = loinfo->dobj.name,
3872 .owner = loinfo->rolname,
3873 .description = "BLOB METADATA",
3874 .section = SECTION_DATA,
3875 .createStmt = cquery->data,
3876 .dropStmt = "-- dummy"));
3877
3878 /*
3879 * Dump per-blob comments and seclabels if any. We assume these are rare
3880 * enough that it's okay to generate retail TOC entries for them.
3881 */
3882 if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
3884 {
3885 for (int i = 0; i < loinfo->numlos; i++)
3886 {
3887 CatalogId catId;
3888 char namebuf[32];
3889
3890 /* Build identifying info for this blob */
3891 catId.tableoid = loinfo->dobj.catId.tableoid;
3892 catId.oid = loinfo->looids[i];
3893 snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[i]);
3894
3895 if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
3896 dumpComment(fout, "LARGE OBJECT", namebuf,
3897 NULL, loinfo->rolname,
3898 catId, 0, loinfo->dobj.dumpId);
3899
3900 if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
3901 dumpSecLabel(fout, "LARGE OBJECT", namebuf,
3902 NULL, loinfo->rolname,
3903 catId, 0, loinfo->dobj.dumpId);
3904 }
3905 }
3906
3907 /*
3908 * Dump the ACLs if any (remember that all blobs in the group will have
3909 * the same ACL). If there's just one blob, dump a simple ACL entry; if
3910 * there's more, make a "LARGE OBJECTS" entry that really contains only
3911 * the ACL for the first blob. _printTocEntry() will be cued by the tag
3912 * string to emit a mutated version for each blob.
3913 */
3914 if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
3915 {
3916 char namebuf[32];
3917
3918 /* Build identifying info for the first blob */
3919 snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[0]);
3920
3921 if (loinfo->numlos > 1)
3922 {
3923 char tagbuf[64];
3924
3925 snprintf(tagbuf, sizeof(tagbuf), "LARGE OBJECTS %u..%u",
3926 loinfo->looids[0], loinfo->looids[loinfo->numlos - 1]);
3927
3928 dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3929 "LARGE OBJECT", namebuf, NULL, NULL,
3930 tagbuf, loinfo->rolname, &loinfo->dacl);
3931 }
3932 else
3933 {
3934 dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
3935 "LARGE OBJECT", namebuf, NULL, NULL,
3936 NULL, loinfo->rolname, &loinfo->dacl);
3937 }
3938 }
3939
3940 destroyPQExpBuffer(cquery);
3941}
3942
3943/*
3944 * dumpLOs:
3945 * dump the data contents of the large objects in the given group
3946 */
3947static int
3948dumpLOs(Archive *fout, const void *arg)
3949{
3950 const LoInfo *loinfo = (const LoInfo *) arg;
3951 PGconn *conn = GetConnection(fout);
3952 char buf[LOBBUFSIZE];
3953
3954 pg_log_info("saving large objects \"%s\"", loinfo->dobj.name);
3955
3956 for (int i = 0; i < loinfo->numlos; i++)
3957 {
3958 Oid loOid = loinfo->looids[i];
3959 int loFd;
3960 int cnt;
3961
3962 /* Open the LO */
3963 loFd = lo_open(conn, loOid, INV_READ);
3964 if (loFd == -1)
3965 pg_fatal("could not open large object %u: %s",
3966 loOid, PQerrorMessage(conn));
3967
3968 StartLO(fout, loOid);
3969
3970 /* Now read it in chunks, sending data to archive */
3971 do
3972 {
3973 cnt = lo_read(conn, loFd, buf, LOBBUFSIZE);
3974 if (cnt < 0)
3975 pg_fatal("error reading large object %u: %s",
3976 loOid, PQerrorMessage(conn));
3977
3978 WriteData(fout, buf, cnt);
3979 } while (cnt > 0);
3980
3981 lo_close(conn, loFd);
3982
3983 EndLO(fout, loOid);
3984 }
3985
3986 return 1;
3987}
3988
3989/*
3990 * getPolicies
3991 * get information about all RLS policies on dumpable tables.
3992 */
3993void
3994getPolicies(Archive *fout, TableInfo tblinfo[], int numTables)
3995{
3996 PQExpBuffer query;
3997 PQExpBuffer tbloids;
3998 PGresult *res;
3999 PolicyInfo *polinfo;
4000 int i_oid;
4001 int i_tableoid;
4002 int i_polrelid;
4003 int i_polname;
4004 int i_polcmd;
4005 int i_polpermissive;
4006 int i_polroles;
4007 int i_polqual;
4008 int i_polwithcheck;
4009 int i,
4010 j,
4011 ntups;
4012
4013 /* No policies before 9.5 */
4014 if (fout->remoteVersion < 90500)
4015 return;
4016
4017 query = createPQExpBuffer();
4018 tbloids = createPQExpBuffer();
4019
4020 /*
4021 * Identify tables of interest, and check which ones have RLS enabled.
4022 */
4023 appendPQExpBufferChar(tbloids, '{');
4024 for (i = 0; i < numTables; i++)
4025 {
4026 TableInfo *tbinfo = &tblinfo[i];
4027
4028 /* Ignore row security on tables not to be dumped */
4029 if (!(tbinfo->dobj.dump & DUMP_COMPONENT_POLICY))
4030 continue;
4031
4032 /* It can't have RLS or policies if it's not a table */
4033 if (tbinfo->relkind != RELKIND_RELATION &&
4034 tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
4035 continue;
4036
4037 /* Add it to the list of table OIDs to be probed below */
4038 if (tbloids->len > 1) /* do we have more than the '{'? */
4039 appendPQExpBufferChar(tbloids, ',');
4040 appendPQExpBuffer(tbloids, "%u", tbinfo->dobj.catId.oid);
4041
4042 /* Is RLS enabled? (That's separate from whether it has policies) */
4043 if (tbinfo->rowsec)
4044 {
4046
4047 /*
4048 * We represent RLS being enabled on a table by creating a
4049 * PolicyInfo object with null polname.
4050 *
4051 * Note: use tableoid 0 so that this object won't be mistaken for
4052 * something that pg_depend entries apply to.
4053 */
4054 polinfo = pg_malloc(sizeof(PolicyInfo));
4055 polinfo->dobj.objType = DO_POLICY;
4056 polinfo->dobj.catId.tableoid = 0;
4057 polinfo->dobj.catId.oid = tbinfo->dobj.catId.oid;
4058 AssignDumpId(&polinfo->dobj);
4059 polinfo->dobj.namespace = tbinfo->dobj.namespace;
4060 polinfo->dobj.name = pg_strdup(tbinfo->dobj.name);
4061 polinfo->poltable = tbinfo;
4062 polinfo->polname = NULL;
4063 polinfo->polcmd = '\0';
4064 polinfo->polpermissive = 0;
4065 polinfo->polroles = NULL;
4066 polinfo->polqual = NULL;
4067 polinfo->polwithcheck = NULL;
4068 }
4069 }
4070 appendPQExpBufferChar(tbloids, '}');
4071
4072 /*
4073 * Now, read all RLS policies belonging to the tables of interest, and
4074 * create PolicyInfo objects for them. (Note that we must filter the
4075 * results server-side not locally, because we dare not apply pg_get_expr
4076 * to tables we don't have lock on.)
4077 */
4078 pg_log_info("reading row-level security policies");
4079
4080 printfPQExpBuffer(query,
4081 "SELECT pol.oid, pol.tableoid, pol.polrelid, pol.polname, pol.polcmd, ");
4082 if (fout->remoteVersion >= 100000)
4083 appendPQExpBufferStr(query, "pol.polpermissive, ");
4084 else
4085 appendPQExpBufferStr(query, "'t' as polpermissive, ");
4086 appendPQExpBuffer(query,
4087 "CASE WHEN pol.polroles = '{0}' THEN NULL ELSE "
4088 " pg_catalog.array_to_string(ARRAY(SELECT pg_catalog.quote_ident(rolname) from pg_catalog.pg_roles WHERE oid = ANY(pol.polroles)), ', ') END AS polroles, "
4089 "pg_catalog.pg_get_expr(pol.polqual, pol.polrelid) AS polqual, "
4090 "pg_catalog.pg_get_expr(pol.polwithcheck, pol.polrelid) AS polwithcheck "
4091 "FROM unnest('%s'::pg_catalog.oid[]) AS src(tbloid)\n"
4092 "JOIN pg_catalog.pg_policy pol ON (src.tbloid = pol.polrelid)",
4093 tbloids->data);
4094
4095 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4096
4097 ntups = PQntuples(res);
4098 if (ntups > 0)
4099 {
4100 i_oid = PQfnumber(res, "oid");
4101 i_tableoid = PQfnumber(res, "tableoid");
4102 i_polrelid = PQfnumber(res, "polrelid");
4103 i_polname = PQfnumber(res, "polname");
4104 i_polcmd = PQfnumber(res, "polcmd");
4105 i_polpermissive = PQfnumber(res, "polpermissive");
4106 i_polroles = PQfnumber(res, "polroles");
4107 i_polqual = PQfnumber(res, "polqual");
4108 i_polwithcheck = PQfnumber(res, "polwithcheck");
4109
4110 polinfo = pg_malloc(ntups * sizeof(PolicyInfo));
4111
4112 for (j = 0; j < ntups; j++)
4113 {
4114 Oid polrelid = atooid(PQgetvalue(res, j, i_polrelid));
4115 TableInfo *tbinfo = findTableByOid(polrelid);
4116
4118
4119 polinfo[j].dobj.objType = DO_POLICY;
4120 polinfo[j].dobj.catId.tableoid =
4121 atooid(PQgetvalue(res, j, i_tableoid));
4122 polinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
4123 AssignDumpId(&polinfo[j].dobj);
4124 polinfo[j].dobj.namespace = tbinfo->dobj.namespace;
4125 polinfo[j].poltable = tbinfo;
4126 polinfo[j].polname = pg_strdup(PQgetvalue(res, j, i_polname));
4127 polinfo[j].dobj.name = pg_strdup(polinfo[j].polname);
4128
4129 polinfo[j].polcmd = *(PQgetvalue(res, j, i_polcmd));
4130 polinfo[j].polpermissive = *(PQgetvalue(res, j, i_polpermissive)) == 't';
4131
4132 if (PQgetisnull(res, j, i_polroles))
4133 polinfo[j].polroles = NULL;
4134 else
4135 polinfo[j].polroles = pg_strdup(PQgetvalue(res, j, i_polroles));
4136
4137 if (PQgetisnull(res, j, i_polqual))
4138 polinfo[j].polqual = NULL;
4139 else
4140 polinfo[j].polqual = pg_strdup(PQgetvalue(res, j, i_polqual));
4141
4142 if (PQgetisnull(res, j, i_polwithcheck))
4143 polinfo[j].polwithcheck = NULL;
4144 else
4145 polinfo[j].polwithcheck
4146 = pg_strdup(PQgetvalue(res, j, i_polwithcheck));
4147 }
4148 }
4149
4150 PQclear(res);
4151
4152 destroyPQExpBuffer(query);
4153 destroyPQExpBuffer(tbloids);
4154}
4155
4156/*
4157 * dumpPolicy
4158 * dump the definition of the given policy
4159 */
4160static void
4161dumpPolicy(Archive *fout, const PolicyInfo *polinfo)
4162{
4163 DumpOptions *dopt = fout->dopt;
4164 TableInfo *tbinfo = polinfo->poltable;
4165 PQExpBuffer query;
4166 PQExpBuffer delqry;
4167 PQExpBuffer polprefix;
4168 char *qtabname;
4169 const char *cmd;
4170 char *tag;
4171
4172 /* Do nothing if not dumping schema */
4173 if (!dopt->dumpSchema)
4174 return;
4175
4176 /*
4177 * If polname is NULL, then this record is just indicating that ROW LEVEL
4178 * SECURITY is enabled for the table. Dump as ALTER TABLE <table> ENABLE
4179 * ROW LEVEL SECURITY.
4180 */
4181 if (polinfo->polname == NULL)
4182 {
4183 query = createPQExpBuffer();
4184
4185 appendPQExpBuffer(query, "ALTER TABLE %s ENABLE ROW LEVEL SECURITY;",
4186 fmtQualifiedDumpable(tbinfo));
4187
4188 /*
4189 * We must emit the ROW SECURITY object's dependency on its table
4190 * explicitly, because it will not match anything in pg_depend (unlike
4191 * the case for other PolicyInfo objects).
4192 */
4193 if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4194 ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4195 ARCHIVE_OPTS(.tag = polinfo->dobj.name,
4196 .namespace = polinfo->dobj.namespace->dobj.name,
4197 .owner = tbinfo->rolname,
4198 .description = "ROW SECURITY",
4199 .section = SECTION_POST_DATA,
4200 .createStmt = query->data,
4201 .deps = &(tbinfo->dobj.dumpId),
4202 .nDeps = 1));
4203
4204 destroyPQExpBuffer(query);
4205 return;
4206 }
4207
4208 if (polinfo->polcmd == '*')
4209 cmd = "";
4210 else if (polinfo->polcmd == 'r')
4211 cmd = " FOR SELECT";
4212 else if (polinfo->polcmd == 'a')
4213 cmd = " FOR INSERT";
4214 else if (polinfo->polcmd == 'w')
4215 cmd = " FOR UPDATE";
4216 else if (polinfo->polcmd == 'd')
4217 cmd = " FOR DELETE";
4218 else
4219 pg_fatal("unexpected policy command type: %c",
4220 polinfo->polcmd);
4221
4222 query = createPQExpBuffer();
4223 delqry = createPQExpBuffer();
4224 polprefix = createPQExpBuffer();
4225
4226 qtabname = pg_strdup(fmtId(tbinfo->dobj.name));
4227
4228 appendPQExpBuffer(query, "CREATE POLICY %s", fmtId(polinfo->polname));
4229
4230 appendPQExpBuffer(query, " ON %s%s%s", fmtQualifiedDumpable(tbinfo),
4231 !polinfo->polpermissive ? " AS RESTRICTIVE" : "", cmd);
4232
4233 if (polinfo->polroles != NULL)
4234 appendPQExpBuffer(query, " TO %s", polinfo->polroles);
4235
4236 if (polinfo->polqual != NULL)
4237 appendPQExpBuffer(query, " USING (%s)", polinfo->polqual);
4238
4239 if (polinfo->polwithcheck != NULL)
4240 appendPQExpBuffer(query, " WITH CHECK (%s)", polinfo->polwithcheck);
4241
4242 appendPQExpBufferStr(query, ";\n");
4243
4244 appendPQExpBuffer(delqry, "DROP POLICY %s", fmtId(polinfo->polname));
4245 appendPQExpBuffer(delqry, " ON %s;\n", fmtQualifiedDumpable(tbinfo));
4246
4247 appendPQExpBuffer(polprefix, "POLICY %s ON",
4248 fmtId(polinfo->polname));
4249
4250 tag = psprintf("%s %s", tbinfo->dobj.name, polinfo->dobj.name);
4251
4252 if (polinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4253 ArchiveEntry(fout, polinfo->dobj.catId, polinfo->dobj.dumpId,
4254 ARCHIVE_OPTS(.tag = tag,
4255 .namespace = polinfo->dobj.namespace->dobj.name,
4256 .owner = tbinfo->rolname,
4257 .description = "POLICY",
4258 .section = SECTION_POST_DATA,
4259 .createStmt = query->data,
4260 .dropStmt = delqry->data));
4261
4262 if (polinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4263 dumpComment(fout, polprefix->data, qtabname,
4264 tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
4265 polinfo->dobj.catId, 0, polinfo->dobj.dumpId);
4266
4267 free(tag);
4268 destroyPQExpBuffer(query);
4269 destroyPQExpBuffer(delqry);
4270 destroyPQExpBuffer(polprefix);
4271 free(qtabname);
4272}
4273
4274/*
4275 * getPublications
4276 * get information about publications
4277 */
4278void
4280{
4281 DumpOptions *dopt = fout->dopt;
4282 PQExpBuffer query;
4283 PGresult *res;
4284 PublicationInfo *pubinfo;
4285 int i_tableoid;
4286 int i_oid;
4287 int i_pubname;
4288 int i_pubowner;
4289 int i_puballtables;
4290 int i_pubinsert;
4291 int i_pubupdate;
4292 int i_pubdelete;
4293 int i_pubtruncate;
4294 int i_pubviaroot;
4295 int i_pubgencols;
4296 int i,
4297 ntups;
4298
4299 if (dopt->no_publications || fout->remoteVersion < 100000)
4300 return;
4301
4302 query = createPQExpBuffer();
4303
4304 /* Get the publications. */
4305 appendPQExpBufferStr(query, "SELECT p.tableoid, p.oid, p.pubname, "
4306 "p.pubowner, p.puballtables, p.pubinsert, "
4307 "p.pubupdate, p.pubdelete, ");
4308
4309 if (fout->remoteVersion >= 110000)
4310 appendPQExpBufferStr(query, "p.pubtruncate, ");
4311 else
4312 appendPQExpBufferStr(query, "false AS pubtruncate, ");
4313
4314 if (fout->remoteVersion >= 130000)
4315 appendPQExpBufferStr(query, "p.pubviaroot, ");
4316 else
4317 appendPQExpBufferStr(query, "false AS pubviaroot, ");
4318
4319 if (fout->remoteVersion >= 180000)
4320 appendPQExpBufferStr(query, "p.pubgencols ");
4321 else
4322 appendPQExpBuffer(query, "'%c' AS pubgencols ", PUBLISH_GENCOLS_NONE);
4323
4324 appendPQExpBufferStr(query, "FROM pg_publication p");
4325
4326 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4327
4328 ntups = PQntuples(res);
4329
4330 if (ntups == 0)
4331 goto cleanup;
4332
4333 i_tableoid = PQfnumber(res, "tableoid");
4334 i_oid = PQfnumber(res, "oid");
4335 i_pubname = PQfnumber(res, "pubname");
4336 i_pubowner = PQfnumber(res, "pubowner");
4337 i_puballtables = PQfnumber(res, "puballtables");
4338 i_pubinsert = PQfnumber(res, "pubinsert");
4339 i_pubupdate = PQfnumber(res, "pubupdate");
4340 i_pubdelete = PQfnumber(res, "pubdelete");
4341 i_pubtruncate = PQfnumber(res, "pubtruncate");
4342 i_pubviaroot = PQfnumber(res, "pubviaroot");
4343 i_pubgencols = PQfnumber(res, "pubgencols");
4344
4345 pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
4346
4347 for (i = 0; i < ntups; i++)
4348 {
4349 pubinfo[i].dobj.objType = DO_PUBLICATION;
4350 pubinfo[i].dobj.catId.tableoid =
4351 atooid(PQgetvalue(res, i, i_tableoid));
4352 pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4353 AssignDumpId(&pubinfo[i].dobj);
4354 pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
4355 pubinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_pubowner));
4356 pubinfo[i].puballtables =
4357 (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
4358 pubinfo[i].pubinsert =
4359 (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
4360 pubinfo[i].pubupdate =
4361 (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
4362 pubinfo[i].pubdelete =
4363 (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
4364 pubinfo[i].pubtruncate =
4365 (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
4366 pubinfo[i].pubviaroot =
4367 (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
4368 pubinfo[i].pubgencols_type =
4369 *(PQgetvalue(res, i, i_pubgencols));
4370
4371 /* Decide whether we want to dump it */
4372 selectDumpableObject(&(pubinfo[i].dobj), fout);
4373 }
4374
4375cleanup:
4376 PQclear(res);
4377
4378 destroyPQExpBuffer(query);
4379}
4380
4381/*
4382 * dumpPublication
4383 * dump the definition of the given publication
4384 */
4385static void
4387{
4388 DumpOptions *dopt = fout->dopt;
4389 PQExpBuffer delq;
4390 PQExpBuffer query;
4391 char *qpubname;
4392 bool first = true;
4393
4394 /* Do nothing if not dumping schema */
4395 if (!dopt->dumpSchema)
4396 return;
4397
4398 delq = createPQExpBuffer();
4399 query = createPQExpBuffer();
4400
4401 qpubname = pg_strdup(fmtId(pubinfo->dobj.name));
4402
4403 appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
4404 qpubname);
4405
4406 appendPQExpBuffer(query, "CREATE PUBLICATION %s",
4407 qpubname);
4408
4409 if (pubinfo->puballtables)
4410 appendPQExpBufferStr(query, " FOR ALL TABLES");
4411
4412 appendPQExpBufferStr(query, " WITH (publish = '");
4413 if (pubinfo->pubinsert)
4414 {
4415 appendPQExpBufferStr(query, "insert");
4416 first = false;
4417 }
4418
4419 if (pubinfo->pubupdate)
4420 {
4421 if (!first)
4422 appendPQExpBufferStr(query, ", ");
4423
4424 appendPQExpBufferStr(query, "update");
4425 first = false;
4426 }
4427
4428 if (pubinfo->pubdelete)
4429 {
4430 if (!first)
4431 appendPQExpBufferStr(query, ", ");
4432
4433 appendPQExpBufferStr(query, "delete");
4434 first = false;
4435 }
4436
4437 if (pubinfo->pubtruncate)
4438 {
4439 if (!first)
4440 appendPQExpBufferStr(query, ", ");
4441
4442 appendPQExpBufferStr(query, "truncate");
4443 first = false;
4444 }
4445
4446 appendPQExpBufferChar(query, '\'');
4447
4448 if (pubinfo->pubviaroot)
4449 appendPQExpBufferStr(query, ", publish_via_partition_root = true");
4450
4451 if (pubinfo->pubgencols_type == PUBLISH_GENCOLS_STORED)
4452 appendPQExpBufferStr(query, ", publish_generated_columns = stored");
4453
4454 appendPQExpBufferStr(query, ");\n");
4455
4456 if (pubinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
4457 ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
4458 ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
4459 .owner = pubinfo->rolname,
4460 .description = "PUBLICATION",
4461 .section = SECTION_POST_DATA,
4462 .createStmt = query->data,
4463 .dropStmt = delq->data));
4464
4465 if (pubinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
4466 dumpComment(fout, "PUBLICATION", qpubname,
4467 NULL, pubinfo->rolname,
4468 pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4469
4470 if (pubinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
4471 dumpSecLabel(fout, "PUBLICATION", qpubname,
4472 NULL, pubinfo->rolname,
4473 pubinfo->dobj.catId, 0, pubinfo->dobj.dumpId);
4474
4475 destroyPQExpBuffer(delq);
4476 destroyPQExpBuffer(query);
4477 free(qpubname);
4478}
4479
4480/*
4481 * getPublicationNamespaces
4482 * get information about publication membership for dumpable schemas.
4483 */
4484void
4486{
4487 PQExpBuffer query;
4488 PGresult *res;
4489 PublicationSchemaInfo *pubsinfo;
4490 DumpOptions *dopt = fout->dopt;
4491 int i_tableoid;
4492 int i_oid;
4493 int i_pnpubid;
4494 int i_pnnspid;
4495 int i,
4496 j,
4497 ntups;
4498
4499 if (dopt->no_publications || fout->remoteVersion < 150000)
4500 return;
4501
4502 query = createPQExpBuffer();
4503
4504 /* Collect all publication membership info. */
4506 "SELECT tableoid, oid, pnpubid, pnnspid "
4507 "FROM pg_catalog.pg_publication_namespace");
4508 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4509
4510 ntups = PQntuples(res);
4511
4512 i_tableoid = PQfnumber(res, "tableoid");
4513 i_oid = PQfnumber(res, "oid");
4514 i_pnpubid = PQfnumber(res, "pnpubid");
4515 i_pnnspid = PQfnumber(res, "pnnspid");
4516
4517 /* this allocation may be more than we need */
4518 pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
4519 j = 0;
4520
4521 for (i = 0; i < ntups; i++)
4522 {
4523 Oid pnpubid = atooid(PQgetvalue(res, i, i_pnpubid));
4524 Oid pnnspid = atooid(PQgetvalue(res, i, i_pnnspid));
4525 PublicationInfo *pubinfo;
4526 NamespaceInfo *nspinfo;
4527
4528 /*
4529 * Ignore any entries for which we aren't interested in either the
4530 * publication or the rel.
4531 */
4532 pubinfo = findPublicationByOid(pnpubid);
4533 if (pubinfo == NULL)
4534 continue;
4535 nspinfo = findNamespaceByOid(pnnspid);
4536 if (nspinfo == NULL)
4537 continue;
4538
4539 /*
4540 * We always dump publication namespaces unless the corresponding
4541 * namespace is excluded from the dump.
4542 */
4543 if (nspinfo->dobj.dump == DUMP_COMPONENT_NONE)
4544 continue;
4545
4546 /* OK, make a DumpableObject for this relationship */
4548 pubsinfo[j].dobj.catId.tableoid =
4549 atooid(PQgetvalue(res, i, i_tableoid));
4550 pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
4551 AssignDumpId(&pubsinfo[j].dobj);
4552 pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace;
4553 pubsinfo[j].dobj.name = nspinfo->dobj.name;
4554 pubsinfo[j].publication = pubinfo;
4555 pubsinfo[j].pubschema = nspinfo;
4556
4557 /* Decide whether we want to dump it */
4558 selectDumpablePublicationObject(&(pubsinfo[j].dobj), fout);
4559
4560 j++;
4561 }
4562
4563 PQclear(res);
4564 destroyPQExpBuffer(query);
4565}
4566
4567/*
4568 * getPublicationTables
4569 * get information about publication membership for dumpable tables.
4570 */
4571void
4572getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
4573{
4574 PQExpBuffer query;
4575 PGresult *res;
4576 PublicationRelInfo *pubrinfo;
4577 DumpOptions *dopt = fout->dopt;
4578 int i_tableoid;
4579 int i_oid;
4580 int i_prpubid;
4581 int i_prrelid;
4582 int i_prrelqual;
4583 int i_prattrs;
4584 int i,
4585 j,
4586 ntups;
4587
4588 if (dopt->no_publications || fout->remoteVersion < 100000)
4589 return;
4590
4591 query = createPQExpBuffer();
4592
4593 /* Collect all publication membership info. */
4594 if (fout->remoteVersion >= 150000)
4596 "SELECT tableoid, oid, prpubid, prrelid, "
4597 "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, "
4598 "(CASE\n"
4599 " WHEN pr.prattrs IS NOT NULL THEN\n"
4600 " (SELECT array_agg(attname)\n"
4601 " FROM\n"
4602 " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
4603 " pg_catalog.pg_attribute\n"
4604 " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n"
4605 " ELSE NULL END) prattrs "
4606 "FROM pg_catalog.pg_publication_rel pr");
4607 else
4609 "SELECT tableoid, oid, prpubid, prrelid, "
4610 "NULL AS prrelqual, NULL AS prattrs "
4611 "FROM pg_catalog.pg_publication_rel");
4612 res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
4613
4614 ntups = PQntuples(res);
4615
4616 i_tableoid = PQfnumber(res, "tableoid");
4617 i_oid = PQfnumber(res, "oid");
4618 i_prpubid = PQfnumber(res, "prpubid");
4619 i_prrelid = PQfnumber(res, "prrelid");
4620 i_prrelqual = PQfnumber(res, "prrelqual");
4621 i_prattrs = PQfnumber(res, "prattrs");
4622
4623 /* this allocation may be more than we need */
4624 pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
4625 j = 0;
4626
4627 for (