PostgreSQL Source Code  git master
dbcommands.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  * Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes. Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  * src/backend/commands/dbcommands.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21 
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
25 
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/htup_details.h"
29 #include "access/multixact.h"
30 #include "access/tableam.h"
31 #include "access/xact.h"
32 #include "access/xloginsert.h"
33 #include "access/xlogrecovery.h"
34 #include "access/xlogutils.h"
35 #include "catalog/catalog.h"
36 #include "catalog/dependency.h"
37 #include "catalog/indexing.h"
38 #include "catalog/objectaccess.h"
39 #include "catalog/pg_authid.h"
40 #include "catalog/pg_collation.h"
41 #include "catalog/pg_database.h"
44 #include "catalog/pg_tablespace.h"
45 #include "commands/comment.h"
46 #include "commands/dbcommands.h"
48 #include "commands/defrem.h"
49 #include "commands/seclabel.h"
50 #include "commands/tablespace.h"
51 #include "common/file_perm.h"
52 #include "mb/pg_wchar.h"
53 #include "miscadmin.h"
54 #include "pgstat.h"
55 #include "postmaster/bgwriter.h"
56 #include "replication/slot.h"
57 #include "storage/copydir.h"
58 #include "storage/fd.h"
59 #include "storage/ipc.h"
60 #include "storage/lmgr.h"
61 #include "storage/md.h"
62 #include "storage/procarray.h"
63 #include "storage/smgr.h"
64 #include "utils/acl.h"
65 #include "utils/builtins.h"
66 #include "utils/fmgroids.h"
67 #include "utils/pg_locale.h"
68 #include "utils/relmapper.h"
69 #include "utils/snapmgr.h"
70 #include "utils/syscache.h"
71 
72 /*
73  * Create database strategy.
74  *
75  * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
76  * copied block.
77  *
78  * CREATEDB_FILE_COPY will simply perform a file system level copy of the
79  * database and log a single record for each tablespace copied. To make this
80  * safe, it also triggers checkpoints before and after the operation.
81  */
82 typedef enum CreateDBStrategy
83 {
87 
88 typedef struct
89 {
90  Oid src_dboid; /* source (template) DB */
91  Oid dest_dboid; /* DB we are trying to create */
92  CreateDBStrategy strategy; /* create db strategy */
94 
95 typedef struct
96 {
97  Oid dest_dboid; /* DB we are trying to move */
98  Oid dest_tsoid; /* tablespace we are trying to move to */
100 
101 /*
102  * Information about a relation to be copied when creating a database.
103  */
104 typedef struct CreateDBRelInfo
105 {
106  RelFileLocator rlocator; /* physical relation identifier */
107  Oid reloid; /* relation oid */
108  bool permanent; /* relation is permanent or unlogged */
110 
111 
112 /* non-export function prototypes */
113 static void createdb_failure_callback(int code, Datum arg);
114 static void movedb(const char *dbname, const char *tblspcname);
115 static void movedb_failure_callback(int code, Datum arg);
116 static bool get_db_info(const char *name, LOCKMODE lockmode,
117  Oid *dbIdP, Oid *ownerIdP,
118  int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
119  TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
120  Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
121  char **dbIcurules,
122  char *dbLocProvider,
123  char **dbCollversion);
124 static void remove_dbtablespaces(Oid db_id);
125 static bool check_db_file_conflict(Oid db_id);
126 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
127 static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
128  Oid dst_tsid);
129 static List *ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath);
131  Oid dbid, char *srcpath,
132  List *rlocatorlist, Snapshot snapshot);
134  Oid tbid, Oid dbid,
135  char *srcpath);
136 static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
137  bool isRedo);
138 static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid,
139  Oid src_tsid, Oid dst_tsid);
140 static void recovery_create_dbdir(char *path, bool only_tblspc);
141 
142 /*
143  * Create a new database using the WAL_LOG strategy.
144  *
145  * Each copied block is separately written to the write-ahead log.
146  */
147 static void
148 CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
149  Oid src_tsid, Oid dst_tsid)
150 {
151  char *srcpath;
152  char *dstpath;
153  List *rlocatorlist = NULL;
154  ListCell *cell;
155  LockRelId srcrelid;
156  LockRelId dstrelid;
157  RelFileLocator srcrlocator;
158  RelFileLocator dstrlocator;
159  CreateDBRelInfo *relinfo;
160 
161  /* Get source and destination database paths. */
162  srcpath = GetDatabasePath(src_dboid, src_tsid);
163  dstpath = GetDatabasePath(dst_dboid, dst_tsid);
164 
165  /* Create database directory and write PG_VERSION file. */
166  CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
167 
168  /* Copy relmap file from source database to the destination database. */
169  RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
170 
171  /* Get list of relfilelocators to copy from the source database. */
172  rlocatorlist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
173  Assert(rlocatorlist != NIL);
174 
175  /*
176  * Database IDs will be the same for all relations so set them before
177  * entering the loop.
178  */
179  srcrelid.dbId = src_dboid;
180  dstrelid.dbId = dst_dboid;
181 
182  /* Loop over our list of relfilelocators and copy each one. */
183  foreach(cell, rlocatorlist)
184  {
185  relinfo = lfirst(cell);
186  srcrlocator = relinfo->rlocator;
187 
188  /*
189  * If the relation is from the source db's default tablespace then we
190  * need to create it in the destination db's default tablespace.
191  * Otherwise, we need to create in the same tablespace as it is in the
192  * source database.
193  */
194  if (srcrlocator.spcOid == src_tsid)
195  dstrlocator.spcOid = dst_tsid;
196  else
197  dstrlocator.spcOid = srcrlocator.spcOid;
198 
199  dstrlocator.dbOid = dst_dboid;
200  dstrlocator.relNumber = srcrlocator.relNumber;
201 
202  /*
203  * Acquire locks on source and target relations before copying.
204  *
205  * We typically do not read relation data into shared_buffers without
206  * holding a relation lock. It's unclear what could go wrong if we
207  * skipped it in this case, because nobody can be modifying either the
208  * source or destination database at this point, and we have locks on
209  * both databases, too, but let's take the conservative route.
210  */
211  dstrelid.relId = srcrelid.relId = relinfo->reloid;
212  LockRelationId(&srcrelid, AccessShareLock);
213  LockRelationId(&dstrelid, AccessShareLock);
214 
215  /* Copy relation storage from source to the destination. */
216  CreateAndCopyRelationData(srcrlocator, dstrlocator, relinfo->permanent);
217 
218  /* Release the relation locks. */
219  UnlockRelationId(&srcrelid, AccessShareLock);
220  UnlockRelationId(&dstrelid, AccessShareLock);
221  }
222 
223  pfree(srcpath);
224  pfree(dstpath);
225  list_free_deep(rlocatorlist);
226 }
227 
228 /*
229  * Scan the pg_class table in the source database to identify the relations
230  * that need to be copied to the destination database.
231  *
232  * This is an exception to the usual rule that cross-database access is
233  * not possible. We can make it work here because we know that there are no
234  * connections to the source database and (since there can't be prepared
235  * transactions touching that database) no in-doubt tuples either. This
236  * means that we don't need to worry about pruning removing anything from
237  * under us, and we don't need to be too picky about our snapshot either.
238  * As long as it sees all previously-committed XIDs as committed and all
239  * aborted XIDs as aborted, we should be fine: nothing else is possible
240  * here.
241  *
242  * We can't rely on the relcache for anything here, because that only knows
243  * about the database to which we are connected, and can't handle access to
244  * other databases. That also means we can't rely on the heap scan
245  * infrastructure, which would be a bad idea anyway since it might try
246  * to do things like HOT pruning which we definitely can't do safely in
247  * a database to which we're not even connected.
248  */
249 static List *
250 ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
251 {
252  RelFileLocator rlocator;
253  BlockNumber nblocks;
254  BlockNumber blkno;
255  Buffer buf;
256  RelFileNumber relfilenumber;
257  Page page;
258  List *rlocatorlist = NIL;
259  LockRelId relid;
260  Snapshot snapshot;
261  SMgrRelation smgr;
262  BufferAccessStrategy bstrategy;
263 
264  /* Get pg_class relfilenumber. */
265  relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
266  RelationRelationId);
267 
268  /* Don't read data into shared_buffers without holding a relation lock. */
269  relid.dbId = dbid;
270  relid.relId = RelationRelationId;
272 
273  /* Prepare a RelFileLocator for the pg_class relation. */
274  rlocator.spcOid = tbid;
275  rlocator.dbOid = dbid;
276  rlocator.relNumber = relfilenumber;
277 
278  smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
279  nblocks = smgrnblocks(smgr, MAIN_FORKNUM);
280  smgrclose(smgr);
281 
282  /* Use a buffer access strategy since this is a bulk read operation. */
283  bstrategy = GetAccessStrategy(BAS_BULKREAD);
284 
285  /*
286  * As explained in the function header comments, we need a snapshot that
287  * will see all committed transactions as committed, and our transaction
288  * snapshot - or the active snapshot - might not be new enough for that,
289  * but the return value of GetLatestSnapshot() should work fine.
290  */
291  snapshot = GetLatestSnapshot();
292 
293  /* Process the relation block by block. */
294  for (blkno = 0; blkno < nblocks; blkno++)
295  {
297 
298  buf = ReadBufferWithoutRelcache(rlocator, MAIN_FORKNUM, blkno,
299  RBM_NORMAL, bstrategy, true);
300 
302  page = BufferGetPage(buf);
303  if (PageIsNew(page) || PageIsEmpty(page))
304  {
306  continue;
307  }
308 
309  /* Append relevant pg_class tuples for current page to rlocatorlist. */
310  rlocatorlist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
311  srcpath, rlocatorlist,
312  snapshot);
313 
315  }
316 
317  /* Release relation lock. */
319 
320  return rlocatorlist;
321 }
322 
323 /*
324  * Scan one page of the source database's pg_class relation and add relevant
325  * entries to rlocatorlist. The return value is the updated list.
326  */
327 static List *
329  char *srcpath, List *rlocatorlist,
330  Snapshot snapshot)
331 {
333  OffsetNumber offnum;
334  OffsetNumber maxoff;
335  HeapTupleData tuple;
336 
337  maxoff = PageGetMaxOffsetNumber(page);
338 
339  /* Loop over offsets. */
340  for (offnum = FirstOffsetNumber;
341  offnum <= maxoff;
342  offnum = OffsetNumberNext(offnum))
343  {
344  ItemId itemid;
345 
346  itemid = PageGetItemId(page, offnum);
347 
348  /* Nothing to do if slot is empty or already dead. */
349  if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
350  ItemIdIsRedirected(itemid))
351  continue;
352 
353  Assert(ItemIdIsNormal(itemid));
354  ItemPointerSet(&(tuple.t_self), blkno, offnum);
355 
356  /* Initialize a HeapTupleData structure. */
357  tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
358  tuple.t_len = ItemIdGetLength(itemid);
359  tuple.t_tableOid = RelationRelationId;
360 
361  /* Skip tuples that are not visible to this snapshot. */
362  if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
363  {
364  CreateDBRelInfo *relinfo;
365 
366  /*
367  * ScanSourceDatabasePgClassTuple is in charge of constructing a
368  * CreateDBRelInfo object for this tuple, but can also decide that
369  * this tuple isn't something we need to copy. If we do need to
370  * copy the relation, add it to the list.
371  */
372  relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
373  srcpath);
374  if (relinfo != NULL)
375  rlocatorlist = lappend(rlocatorlist, relinfo);
376  }
377  }
378 
379  return rlocatorlist;
380 }
381 
382 /*
383  * Decide whether a certain pg_class tuple represents something that
384  * needs to be copied from the source database to the destination database,
385  * and if so, construct a CreateDBRelInfo for it.
386  *
387  * Visibility checks are handled by the caller, so our job here is just
388  * to assess the data stored in the tuple.
389  */
392  char *srcpath)
393 {
394  CreateDBRelInfo *relinfo;
395  Form_pg_class classForm;
396  RelFileNumber relfilenumber = InvalidRelFileNumber;
397 
398  classForm = (Form_pg_class) GETSTRUCT(tuple);
399 
400  /*
401  * Return NULL if this object does not need to be copied.
402  *
403  * Shared objects don't need to be copied, because they are shared.
404  * Objects without storage can't be copied, because there's nothing to
405  * copy. Temporary relations don't need to be copied either, because they
406  * are inaccessible outside of the session that created them, which must
407  * be gone already, and couldn't connect to a different database if it
408  * still existed. autovacuum will eventually remove the pg_class entries
409  * as well.
410  */
411  if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
412  !RELKIND_HAS_STORAGE(classForm->relkind) ||
413  classForm->relpersistence == RELPERSISTENCE_TEMP)
414  return NULL;
415 
416  /*
417  * If relfilenumber is valid then directly use it. Otherwise, consult the
418  * relmap.
419  */
420  if (RelFileNumberIsValid(classForm->relfilenode))
421  relfilenumber = classForm->relfilenode;
422  else
423  relfilenumber = RelationMapOidToFilenumberForDatabase(srcpath,
424  classForm->oid);
425 
426  /* We must have a valid relfilenumber. */
427  if (!RelFileNumberIsValid(relfilenumber))
428  elog(ERROR, "relation with OID %u does not have a valid relfilenumber",
429  classForm->oid);
430 
431  /* Prepare a rel info element and add it to the list. */
432  relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
433  if (OidIsValid(classForm->reltablespace))
434  relinfo->rlocator.spcOid = classForm->reltablespace;
435  else
436  relinfo->rlocator.spcOid = tbid;
437 
438  relinfo->rlocator.dbOid = dbid;
439  relinfo->rlocator.relNumber = relfilenumber;
440  relinfo->reloid = classForm->oid;
441 
442  /* Temporary relations were rejected above. */
443  Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
444  relinfo->permanent =
445  (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
446 
447  return relinfo;
448 }
449 
450 /*
451  * Create database directory and write out the PG_VERSION file in the database
452  * path. If isRedo is true, it's okay for the database directory to exist
453  * already.
454  */
455 static void
456 CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
457 {
458  int fd;
459  int nbytes;
460  char versionfile[MAXPGPATH];
461  char buf[16];
462 
463  /*
464  * Note that we don't have to copy version data from the source database;
465  * there's only one legal value.
466  */
467  sprintf(buf, "%s\n", PG_MAJORVERSION);
468  nbytes = strlen(PG_MAJORVERSION) + 1;
469 
470  /* Create database directory. */
471  if (MakePGDirectory(dbpath) < 0)
472  {
473  /* Failure other than already exists or not in WAL replay? */
474  if (errno != EEXIST || !isRedo)
475  ereport(ERROR,
477  errmsg("could not create directory \"%s\": %m", dbpath)));
478  }
479 
480  /*
481  * Create PG_VERSION file in the database path. If the file already
482  * exists and we are in WAL replay then try again to open it in write
483  * mode.
484  */
485  snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
486 
487  fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
488  if (fd < 0 && errno == EEXIST && isRedo)
489  fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
490 
491  if (fd < 0)
492  ereport(ERROR,
494  errmsg("could not create file \"%s\": %m", versionfile)));
495 
496  /* Write PG_MAJORVERSION in the PG_VERSION file. */
497  pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
498  errno = 0;
499  if ((int) write(fd, buf, nbytes) != nbytes)
500  {
501  /* If write didn't set errno, assume problem is no disk space. */
502  if (errno == 0)
503  errno = ENOSPC;
504  ereport(ERROR,
506  errmsg("could not write to file \"%s\": %m", versionfile)));
507  }
509 
510  pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_SYNC);
511  if (pg_fsync(fd) != 0)
514  errmsg("could not fsync file \"%s\": %m", versionfile)));
515  fsync_fname(dbpath, true);
517 
518  /* Close the version file. */
520 
521  /* If we are not in WAL replay then write the WAL. */
522  if (!isRedo)
523  {
525 
527 
528  xlrec.db_id = dbid;
529  xlrec.tablespace_id = tsid;
530 
531  XLogBeginInsert();
532  XLogRegisterData((char *) (&xlrec),
534 
535  (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
536 
538  }
539 }
540 
541 /*
542  * Create a new database using the FILE_COPY strategy.
543  *
544  * Copy each tablespace at the filesystem level, and log a single WAL record
545  * for each tablespace copied. This requires a checkpoint before and after the
546  * copy, which may be expensive, but it does greatly reduce WAL generation
547  * if the copied database is large.
548  */
549 static void
550 CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
551  Oid dst_tsid)
552 {
553  TableScanDesc scan;
554  Relation rel;
555  HeapTuple tuple;
556 
557  /*
558  * Force a checkpoint before starting the copy. This will force all dirty
559  * buffers, including those of unlogged tables, out to disk, to ensure
560  * source database is up-to-date on disk for the copy.
561  * FlushDatabaseBuffers() would suffice for that, but we also want to
562  * process any pending unlink requests. Otherwise, if a checkpoint
563  * happened while we're copying files, a file might be deleted just when
564  * we're about to copy it, causing the lstat() call in copydir() to fail
565  * with ENOENT.
566  *
567  * In binary upgrade mode, we can skip this checkpoint because pg_upgrade
568  * is careful to ensure that template0 is fully written to disk prior to
569  * any CREATE DATABASE commands.
570  */
571  if (!IsBinaryUpgrade)
574 
575  /*
576  * Iterate through all tablespaces of the template database, and copy each
577  * one to the new database.
578  */
579  rel = table_open(TableSpaceRelationId, AccessShareLock);
580  scan = table_beginscan_catalog(rel, 0, NULL);
581  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
582  {
583  Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
584  Oid srctablespace = spaceform->oid;
585  Oid dsttablespace;
586  char *srcpath;
587  char *dstpath;
588  struct stat st;
589 
590  /* No need to copy global tablespace */
591  if (srctablespace == GLOBALTABLESPACE_OID)
592  continue;
593 
594  srcpath = GetDatabasePath(src_dboid, srctablespace);
595 
596  if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
597  directory_is_empty(srcpath))
598  {
599  /* Assume we can ignore it */
600  pfree(srcpath);
601  continue;
602  }
603 
604  if (srctablespace == src_tsid)
605  dsttablespace = dst_tsid;
606  else
607  dsttablespace = srctablespace;
608 
609  dstpath = GetDatabasePath(dst_dboid, dsttablespace);
610 
611  /*
612  * Copy this subdirectory to the new location
613  *
614  * We don't need to copy subdirectories
615  */
616  copydir(srcpath, dstpath, false);
617 
618  /* Record the filesystem change in XLOG */
619  {
621 
622  xlrec.db_id = dst_dboid;
623  xlrec.tablespace_id = dsttablespace;
624  xlrec.src_db_id = src_dboid;
625  xlrec.src_tablespace_id = srctablespace;
626 
627  XLogBeginInsert();
628  XLogRegisterData((char *) &xlrec,
630 
631  (void) XLogInsert(RM_DBASE_ID,
633  }
634  pfree(srcpath);
635  pfree(dstpath);
636  }
637  table_endscan(scan);
639 
640  /*
641  * We force a checkpoint before committing. This effectively means that
642  * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
643  * replayed (at least not in ordinary crash recovery; we still have to
644  * make the XLOG entry for the benefit of PITR operations). This avoids
645  * two nasty scenarios:
646  *
647  * #1: At wal_level=minimal, we don't XLOG the contents of newly created
648  * relfilenodes; therefore the drop-and-recreate-whole-directory behavior
649  * of DBASE_CREATE replay would lose such files created in the new
650  * database between our commit and the next checkpoint.
651  *
652  * #2: Since we have to recopy the source database during DBASE_CREATE
653  * replay, we run the risk of copying changes in it that were committed
654  * after the original CREATE DATABASE command but before the system crash
655  * that led to the replay. This is at least unexpected and at worst could
656  * lead to inconsistencies, eg duplicate table names.
657  *
658  * (Both of these were real bugs in releases 8.0 through 8.0.3.)
659  *
660  * In PITR replay, the first of these isn't an issue, and the second is
661  * only a risk if the CREATE DATABASE and subsequent template database
662  * change both occur while a base backup is being taken. There doesn't
663  * seem to be much we can do about that except document it as a
664  * limitation.
665  *
666  * In binary upgrade mode, we can skip this checkpoint because neither of
667  * these problems applies: we don't ever replay the WAL generated during
668  * pg_upgrade, and we don't support taking base backups during pg_upgrade
669  * (not to mention that we don't concurrently modify template0, either).
670  *
671  * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
672  * strategy that avoids these problems.
673  */
674  if (!IsBinaryUpgrade)
677 }
678 
679 /*
680  * CREATE DATABASE
681  */
682 Oid
684 {
685  Oid src_dboid;
686  Oid src_owner;
687  int src_encoding = -1;
688  char *src_collate = NULL;
689  char *src_ctype = NULL;
690  char *src_locale = NULL;
691  char *src_icurules = NULL;
692  char src_locprovider = '\0';
693  char *src_collversion = NULL;
694  bool src_istemplate;
695  bool src_hasloginevt = false;
696  bool src_allowconn;
697  TransactionId src_frozenxid = InvalidTransactionId;
698  MultiXactId src_minmxid = InvalidMultiXactId;
699  Oid src_deftablespace;
700  volatile Oid dst_deftablespace;
701  Relation pg_database_rel;
702  HeapTuple tuple;
703  Datum new_record[Natts_pg_database] = {0};
704  bool new_record_nulls[Natts_pg_database] = {0};
705  Oid dboid = InvalidOid;
706  Oid datdba;
707  ListCell *option;
708  DefElem *tablespacenameEl = NULL;
709  DefElem *ownerEl = NULL;
710  DefElem *templateEl = NULL;
711  DefElem *encodingEl = NULL;
712  DefElem *localeEl = NULL;
713  DefElem *builtinlocaleEl = NULL;
714  DefElem *collateEl = NULL;
715  DefElem *ctypeEl = NULL;
716  DefElem *iculocaleEl = NULL;
717  DefElem *icurulesEl = NULL;
718  DefElem *locproviderEl = NULL;
719  DefElem *istemplateEl = NULL;
720  DefElem *allowconnectionsEl = NULL;
721  DefElem *connlimitEl = NULL;
722  DefElem *collversionEl = NULL;
723  DefElem *strategyEl = NULL;
724  char *dbname = stmt->dbname;
725  char *dbowner = NULL;
726  const char *dbtemplate = NULL;
727  char *dbcollate = NULL;
728  char *dbctype = NULL;
729  const char *dblocale = NULL;
730  char *dbicurules = NULL;
731  char dblocprovider = '\0';
732  char *canonname;
733  int encoding = -1;
734  bool dbistemplate = false;
735  bool dballowconnections = true;
736  int dbconnlimit = DATCONNLIMIT_UNLIMITED;
737  char *dbcollversion = NULL;
738  int notherbackends;
739  int npreparedxacts;
740  CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
742 
743  /* Extract options from the statement node tree */
744  foreach(option, stmt->options)
745  {
746  DefElem *defel = (DefElem *) lfirst(option);
747 
748  if (strcmp(defel->defname, "tablespace") == 0)
749  {
750  if (tablespacenameEl)
751  errorConflictingDefElem(defel, pstate);
752  tablespacenameEl = defel;
753  }
754  else if (strcmp(defel->defname, "owner") == 0)
755  {
756  if (ownerEl)
757  errorConflictingDefElem(defel, pstate);
758  ownerEl = defel;
759  }
760  else if (strcmp(defel->defname, "template") == 0)
761  {
762  if (templateEl)
763  errorConflictingDefElem(defel, pstate);
764  templateEl = defel;
765  }
766  else if (strcmp(defel->defname, "encoding") == 0)
767  {
768  if (encodingEl)
769  errorConflictingDefElem(defel, pstate);
770  encodingEl = defel;
771  }
772  else if (strcmp(defel->defname, "locale") == 0)
773  {
774  if (localeEl)
775  errorConflictingDefElem(defel, pstate);
776  localeEl = defel;
777  }
778  else if (strcmp(defel->defname, "builtin_locale") == 0)
779  {
780  if (builtinlocaleEl)
781  errorConflictingDefElem(defel, pstate);
782  builtinlocaleEl = defel;
783  }
784  else if (strcmp(defel->defname, "lc_collate") == 0)
785  {
786  if (collateEl)
787  errorConflictingDefElem(defel, pstate);
788  collateEl = defel;
789  }
790  else if (strcmp(defel->defname, "lc_ctype") == 0)
791  {
792  if (ctypeEl)
793  errorConflictingDefElem(defel, pstate);
794  ctypeEl = defel;
795  }
796  else if (strcmp(defel->defname, "icu_locale") == 0)
797  {
798  if (iculocaleEl)
799  errorConflictingDefElem(defel, pstate);
800  iculocaleEl = defel;
801  }
802  else if (strcmp(defel->defname, "icu_rules") == 0)
803  {
804  if (icurulesEl)
805  errorConflictingDefElem(defel, pstate);
806  icurulesEl = defel;
807  }
808  else if (strcmp(defel->defname, "locale_provider") == 0)
809  {
810  if (locproviderEl)
811  errorConflictingDefElem(defel, pstate);
812  locproviderEl = defel;
813  }
814  else if (strcmp(defel->defname, "is_template") == 0)
815  {
816  if (istemplateEl)
817  errorConflictingDefElem(defel, pstate);
818  istemplateEl = defel;
819  }
820  else if (strcmp(defel->defname, "allow_connections") == 0)
821  {
822  if (allowconnectionsEl)
823  errorConflictingDefElem(defel, pstate);
824  allowconnectionsEl = defel;
825  }
826  else if (strcmp(defel->defname, "connection_limit") == 0)
827  {
828  if (connlimitEl)
829  errorConflictingDefElem(defel, pstate);
830  connlimitEl = defel;
831  }
832  else if (strcmp(defel->defname, "collation_version") == 0)
833  {
834  if (collversionEl)
835  errorConflictingDefElem(defel, pstate);
836  collversionEl = defel;
837  }
838  else if (strcmp(defel->defname, "location") == 0)
839  {
841  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842  errmsg("LOCATION is not supported anymore"),
843  errhint("Consider using tablespaces instead."),
844  parser_errposition(pstate, defel->location)));
845  }
846  else if (strcmp(defel->defname, "oid") == 0)
847  {
848  dboid = defGetObjectId(defel);
849 
850  /*
851  * We don't normally permit new databases to be created with
852  * system-assigned OIDs. pg_upgrade tries to preserve database
853  * OIDs, so we can't allow any database to be created with an OID
854  * that might be in use in a freshly-initialized cluster created
855  * by some future version. We assume all such OIDs will be from
856  * the system-managed OID range.
857  *
858  * As an exception, however, we permit any OID to be assigned when
859  * allow_system_table_mods=on (so that initdb can assign system
860  * OIDs to template0 and postgres) or when performing a binary
861  * upgrade (so that pg_upgrade can preserve whatever OIDs it finds
862  * in the source cluster).
863  */
864  if (dboid < FirstNormalObjectId &&
866  ereport(ERROR,
867  (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
868  errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
869  }
870  else if (strcmp(defel->defname, "strategy") == 0)
871  {
872  if (strategyEl)
873  errorConflictingDefElem(defel, pstate);
874  strategyEl = defel;
875  }
876  else
877  ereport(ERROR,
878  (errcode(ERRCODE_SYNTAX_ERROR),
879  errmsg("option \"%s\" not recognized", defel->defname),
880  parser_errposition(pstate, defel->location)));
881  }
882 
883  if (ownerEl && ownerEl->arg)
884  dbowner = defGetString(ownerEl);
885  if (templateEl && templateEl->arg)
886  dbtemplate = defGetString(templateEl);
887  if (encodingEl && encodingEl->arg)
888  {
889  const char *encoding_name;
890 
891  if (IsA(encodingEl->arg, Integer))
892  {
893  encoding = defGetInt32(encodingEl);
894  encoding_name = pg_encoding_to_char(encoding);
895  if (strcmp(encoding_name, "") == 0 ||
896  pg_valid_server_encoding(encoding_name) < 0)
897  ereport(ERROR,
898  (errcode(ERRCODE_UNDEFINED_OBJECT),
899  errmsg("%d is not a valid encoding code",
900  encoding),
901  parser_errposition(pstate, encodingEl->location)));
902  }
903  else
904  {
905  encoding_name = defGetString(encodingEl);
906  encoding = pg_valid_server_encoding(encoding_name);
907  if (encoding < 0)
908  ereport(ERROR,
909  (errcode(ERRCODE_UNDEFINED_OBJECT),
910  errmsg("%s is not a valid encoding name",
911  encoding_name),
912  parser_errposition(pstate, encodingEl->location)));
913  }
914  }
915  if (localeEl && localeEl->arg)
916  {
917  dbcollate = defGetString(localeEl);
918  dbctype = defGetString(localeEl);
919  dblocale = defGetString(localeEl);
920  }
921  if (builtinlocaleEl && builtinlocaleEl->arg)
922  dblocale = defGetString(builtinlocaleEl);
923  if (collateEl && collateEl->arg)
924  dbcollate = defGetString(collateEl);
925  if (ctypeEl && ctypeEl->arg)
926  dbctype = defGetString(ctypeEl);
927  if (iculocaleEl && iculocaleEl->arg)
928  dblocale = defGetString(iculocaleEl);
929  if (icurulesEl && icurulesEl->arg)
930  dbicurules = defGetString(icurulesEl);
931  if (locproviderEl && locproviderEl->arg)
932  {
933  char *locproviderstr = defGetString(locproviderEl);
934 
935  if (pg_strcasecmp(locproviderstr, "builtin") == 0)
936  dblocprovider = COLLPROVIDER_BUILTIN;
937  else if (pg_strcasecmp(locproviderstr, "icu") == 0)
938  dblocprovider = COLLPROVIDER_ICU;
939  else if (pg_strcasecmp(locproviderstr, "libc") == 0)
940  dblocprovider = COLLPROVIDER_LIBC;
941  else
942  ereport(ERROR,
943  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
944  errmsg("unrecognized locale provider: %s",
945  locproviderstr)));
946  }
947  if (istemplateEl && istemplateEl->arg)
948  dbistemplate = defGetBoolean(istemplateEl);
949  if (allowconnectionsEl && allowconnectionsEl->arg)
950  dballowconnections = defGetBoolean(allowconnectionsEl);
951  if (connlimitEl && connlimitEl->arg)
952  {
953  dbconnlimit = defGetInt32(connlimitEl);
954  if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
955  ereport(ERROR,
956  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
957  errmsg("invalid connection limit: %d", dbconnlimit)));
958  }
959  if (collversionEl)
960  dbcollversion = defGetString(collversionEl);
961 
962  /* obtain OID of proposed owner */
963  if (dbowner)
964  datdba = get_role_oid(dbowner, false);
965  else
966  datdba = GetUserId();
967 
968  /*
969  * To create a database, must have createdb privilege and must be able to
970  * become the target role (this does not imply that the target role itself
971  * must have createdb privilege). The latter provision guards against
972  * "giveaway" attacks. Note that a superuser will always have both of
973  * these privileges a fortiori.
974  */
976  ereport(ERROR,
977  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
978  errmsg("permission denied to create database")));
979 
980  check_can_set_role(GetUserId(), datdba);
981 
982  /*
983  * Lookup database (template) to be cloned, and obtain share lock on it.
984  * ShareLock allows two CREATE DATABASEs to work from the same template
985  * concurrently, while ensuring no one is busy dropping it in parallel
986  * (which would be Very Bad since we'd likely get an incomplete copy
987  * without knowing it). This also prevents any new connections from being
988  * made to the source until we finish copying it, so we can be sure it
989  * won't change underneath us.
990  */
991  if (!dbtemplate)
992  dbtemplate = "template1"; /* Default template database name */
993 
994  if (!get_db_info(dbtemplate, ShareLock,
995  &src_dboid, &src_owner, &src_encoding,
996  &src_istemplate, &src_allowconn, &src_hasloginevt,
997  &src_frozenxid, &src_minmxid, &src_deftablespace,
998  &src_collate, &src_ctype, &src_locale, &src_icurules, &src_locprovider,
999  &src_collversion))
1000  ereport(ERROR,
1001  (errcode(ERRCODE_UNDEFINED_DATABASE),
1002  errmsg("template database \"%s\" does not exist",
1003  dbtemplate)));
1004 
1005  /*
1006  * If the source database was in the process of being dropped, we can't
1007  * use it as a template.
1008  */
1009  if (database_is_invalid_oid(src_dboid))
1010  ereport(ERROR,
1011  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1012  errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
1013  errhint("Use DROP DATABASE to drop invalid databases."));
1014 
1015  /*
1016  * Permission check: to copy a DB that's not marked datistemplate, you
1017  * must be superuser or the owner thereof.
1018  */
1019  if (!src_istemplate)
1020  {
1021  if (!object_ownercheck(DatabaseRelationId, src_dboid, GetUserId()))
1022  ereport(ERROR,
1023  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1024  errmsg("permission denied to copy database \"%s\"",
1025  dbtemplate)));
1026  }
1027 
1028  /* Validate the database creation strategy. */
1029  if (strategyEl && strategyEl->arg)
1030  {
1031  char *strategy;
1032 
1033  strategy = defGetString(strategyEl);
1034  if (pg_strcasecmp(strategy, "wal_log") == 0)
1035  dbstrategy = CREATEDB_WAL_LOG;
1036  else if (pg_strcasecmp(strategy, "file_copy") == 0)
1037  dbstrategy = CREATEDB_FILE_COPY;
1038  else
1039  ereport(ERROR,
1040  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1041  errmsg("invalid create database strategy \"%s\"", strategy),
1042  errhint("Valid strategies are \"wal_log\" and \"file_copy\".")));
1043  }
1044 
1045  /* If encoding or locales are defaulted, use source's setting */
1046  if (encoding < 0)
1047  encoding = src_encoding;
1048  if (dbcollate == NULL)
1049  dbcollate = src_collate;
1050  if (dbctype == NULL)
1051  dbctype = src_ctype;
1052  if (dblocprovider == '\0')
1053  dblocprovider = src_locprovider;
1054  if (dblocale == NULL)
1055  dblocale = src_locale;
1056  if (dbicurules == NULL)
1057  dbicurules = src_icurules;
1058 
1059  /* Some encodings are client only */
1061  ereport(ERROR,
1062  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1063  errmsg("invalid server encoding %d", encoding)));
1064 
1065  /* Check that the chosen locales are valid, and get canonical spellings */
1066  if (!check_locale(LC_COLLATE, dbcollate, &canonname))
1067  ereport(ERROR,
1068  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1069  errmsg("invalid LC_COLLATE locale name: \"%s\"", dbcollate),
1070  errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1071  dbcollate = canonname;
1072  if (!check_locale(LC_CTYPE, dbctype, &canonname))
1073  ereport(ERROR,
1074  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1075  errmsg("invalid LC_CTYPE locale name: \"%s\"", dbctype),
1076  errhint("If the locale name is specific to ICU, use ICU_LOCALE.")));
1077  dbctype = canonname;
1078 
1079  check_encoding_locale_matches(encoding, dbcollate, dbctype);
1080 
1081  /* validate provider-specific parameters */
1082  if (dblocprovider != COLLPROVIDER_BUILTIN)
1083  {
1084  if (builtinlocaleEl)
1085  ereport(ERROR,
1086  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1087  errmsg("BUILTIN_LOCALE cannot be specified unless locale provider is builtin")));
1088  }
1089 
1090  if (dblocprovider != COLLPROVIDER_ICU)
1091  {
1092  if (iculocaleEl)
1093  ereport(ERROR,
1094  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1095  errmsg("ICU locale cannot be specified unless locale provider is ICU")));
1096 
1097  if (dbicurules)
1098  ereport(ERROR,
1099  (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1100  errmsg("ICU rules cannot be specified unless locale provider is ICU")));
1101  }
1102 
1103  /* validate and canonicalize locale for the provider */
1104  if (dblocprovider == COLLPROVIDER_BUILTIN)
1105  {
1106  /*
1107  * This would happen if template0 uses the libc provider but the new
1108  * database uses builtin.
1109  */
1110  if (!dblocale)
1111  ereport(ERROR,
1112  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1113  errmsg("LOCALE or BUILTIN_LOCALE must be specified")));
1114 
1115  dblocale = builtin_validate_locale(encoding, dblocale);
1116  }
1117  else if (dblocprovider == COLLPROVIDER_ICU)
1118  {
1120  ereport(ERROR,
1121  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1122  errmsg("encoding \"%s\" is not supported with ICU provider",
1124 
1125  /*
1126  * This would happen if template0 uses the libc provider but the new
1127  * database uses icu.
1128  */
1129  if (!dblocale)
1130  ereport(ERROR,
1131  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1132  errmsg("LOCALE or ICU_LOCALE must be specified")));
1133 
1134  /*
1135  * During binary upgrade, or when the locale came from the template
1136  * database, preserve locale string. Otherwise, canonicalize to a
1137  * language tag.
1138  */
1139  if (!IsBinaryUpgrade && dblocale != src_locale)
1140  {
1141  char *langtag = icu_language_tag(dblocale,
1143 
1144  if (langtag && strcmp(dblocale, langtag) != 0)
1145  {
1146  ereport(NOTICE,
1147  (errmsg("using standard form \"%s\" for ICU locale \"%s\"",
1148  langtag, dblocale)));
1149 
1150  dblocale = langtag;
1151  }
1152  }
1153 
1154  icu_validate_locale(dblocale);
1155  }
1156 
1157  /* for libc, locale comes from datcollate and datctype */
1158  if (dblocprovider == COLLPROVIDER_LIBC)
1159  dblocale = NULL;
1160 
1161  /*
1162  * Check that the new encoding and locale settings match the source
1163  * database. We insist on this because we simply copy the source data ---
1164  * any non-ASCII data would be wrongly encoded, and any indexes sorted
1165  * according to the source locale would be wrong.
1166  *
1167  * However, we assume that template0 doesn't contain any non-ASCII data
1168  * nor any indexes that depend on collation or ctype, so template0 can be
1169  * used as template for creating a database with any encoding or locale.
1170  */
1171  if (strcmp(dbtemplate, "template0") != 0)
1172  {
1173  if (encoding != src_encoding)
1174  ereport(ERROR,
1175  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1176  errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
1178  pg_encoding_to_char(src_encoding)),
1179  errhint("Use the same encoding as in the template database, or use template0 as template.")));
1180 
1181  if (strcmp(dbcollate, src_collate) != 0)
1182  ereport(ERROR,
1183  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1184  errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
1185  dbcollate, src_collate),
1186  errhint("Use the same collation as in the template database, or use template0 as template.")));
1187 
1188  if (strcmp(dbctype, src_ctype) != 0)
1189  ereport(ERROR,
1190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191  errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
1192  dbctype, src_ctype),
1193  errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
1194 
1195  if (dblocprovider != src_locprovider)
1196  ereport(ERROR,
1197  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1198  errmsg("new locale provider (%s) does not match locale provider of the template database (%s)",
1199  collprovider_name(dblocprovider), collprovider_name(src_locprovider)),
1200  errhint("Use the same locale provider as in the template database, or use template0 as template.")));
1201 
1202  if (dblocprovider == COLLPROVIDER_ICU)
1203  {
1204  char *val1;
1205  char *val2;
1206 
1207  Assert(dblocale);
1208  Assert(src_locale);
1209  if (strcmp(dblocale, src_locale) != 0)
1210  ereport(ERROR,
1211  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1212  errmsg("new ICU locale (%s) is incompatible with the ICU locale of the template database (%s)",
1213  dblocale, src_locale),
1214  errhint("Use the same ICU locale as in the template database, or use template0 as template.")));
1215 
1216  val1 = dbicurules;
1217  if (!val1)
1218  val1 = "";
1219  val2 = src_icurules;
1220  if (!val2)
1221  val2 = "";
1222  if (strcmp(val1, val2) != 0)
1223  ereport(ERROR,
1224  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1225  errmsg("new ICU collation rules (%s) are incompatible with the ICU collation rules of the template database (%s)",
1226  val1, val2),
1227  errhint("Use the same ICU collation rules as in the template database, or use template0 as template.")));
1228  }
1229  }
1230 
1231  /*
1232  * If we got a collation version for the template database, check that it
1233  * matches the actual OS collation version. Otherwise error; the user
1234  * needs to fix the template database first. Don't complain if a
1235  * collation version was specified explicitly as a statement option; that
1236  * is used by pg_upgrade to reproduce the old state exactly.
1237  *
1238  * (If the template database has no collation version, then either the
1239  * platform/provider does not support collation versioning, or it's
1240  * template0, for which we stipulate that it does not contain
1241  * collation-using objects.)
1242  */
1243  if (src_collversion && !collversionEl)
1244  {
1245  char *actual_versionstr;
1246  const char *locale;
1247 
1248  if (dblocprovider == COLLPROVIDER_LIBC)
1249  locale = dbcollate;
1250  else
1251  locale = dblocale;
1252 
1253  actual_versionstr = get_collation_actual_version(dblocprovider, locale);
1254  if (!actual_versionstr)
1255  ereport(ERROR,
1256  (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
1257  dbtemplate)));
1258 
1259  if (strcmp(actual_versionstr, src_collversion) != 0)
1260  ereport(ERROR,
1261  (errmsg("template database \"%s\" has a collation version mismatch",
1262  dbtemplate),
1263  errdetail("The template database was created using collation version %s, "
1264  "but the operating system provides version %s.",
1265  src_collversion, actual_versionstr),
1266  errhint("Rebuild all objects in the template database that use the default collation and run "
1267  "ALTER DATABASE %s REFRESH COLLATION VERSION, "
1268  "or build PostgreSQL with the right library version.",
1269  quote_identifier(dbtemplate))));
1270  }
1271 
1272  if (dbcollversion == NULL)
1273  dbcollversion = src_collversion;
1274 
1275  /*
1276  * Normally, we copy the collation version from the template database.
1277  * This last resort only applies if the template database does not have a
1278  * collation version, which is normally only the case for template0.
1279  */
1280  if (dbcollversion == NULL)
1281  {
1282  const char *locale;
1283 
1284  if (dblocprovider == COLLPROVIDER_LIBC)
1285  locale = dbcollate;
1286  else
1287  locale = dblocale;
1288 
1289  dbcollversion = get_collation_actual_version(dblocprovider, locale);
1290  }
1291 
1292  /* Resolve default tablespace for new database */
1293  if (tablespacenameEl && tablespacenameEl->arg)
1294  {
1295  char *tablespacename;
1296  AclResult aclresult;
1297 
1298  tablespacename = defGetString(tablespacenameEl);
1299  dst_deftablespace = get_tablespace_oid(tablespacename, false);
1300  /* check permissions */
1301  aclresult = object_aclcheck(TableSpaceRelationId, dst_deftablespace, GetUserId(),
1302  ACL_CREATE);
1303  if (aclresult != ACLCHECK_OK)
1304  aclcheck_error(aclresult, OBJECT_TABLESPACE,
1305  tablespacename);
1306 
1307  /* pg_global must never be the default tablespace */
1308  if (dst_deftablespace == GLOBALTABLESPACE_OID)
1309  ereport(ERROR,
1310  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1311  errmsg("pg_global cannot be used as default tablespace")));
1312 
1313  /*
1314  * If we are trying to change the default tablespace of the template,
1315  * we require that the template not have any files in the new default
1316  * tablespace. This is necessary because otherwise the copied
1317  * database would contain pg_class rows that refer to its default
1318  * tablespace both explicitly (by OID) and implicitly (as zero), which
1319  * would cause problems. For example another CREATE DATABASE using
1320  * the copied database as template, and trying to change its default
1321  * tablespace again, would yield outright incorrect results (it would
1322  * improperly move tables to the new default tablespace that should
1323  * stay in the same tablespace).
1324  */
1325  if (dst_deftablespace != src_deftablespace)
1326  {
1327  char *srcpath;
1328  struct stat st;
1329 
1330  srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
1331 
1332  if (stat(srcpath, &st) == 0 &&
1333  S_ISDIR(st.st_mode) &&
1334  !directory_is_empty(srcpath))
1335  ereport(ERROR,
1336  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337  errmsg("cannot assign new default tablespace \"%s\"",
1338  tablespacename),
1339  errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
1340  dbtemplate)));
1341  pfree(srcpath);
1342  }
1343  }
1344  else
1345  {
1346  /* Use template database's default tablespace */
1347  dst_deftablespace = src_deftablespace;
1348  /* Note there is no additional permission check in this path */
1349  }
1350 
1351  /*
1352  * If built with appropriate switch, whine when regression-testing
1353  * conventions for database names are violated. But don't complain during
1354  * initdb.
1355  */
1356 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1357  if (IsUnderPostmaster && strstr(dbname, "regression") == NULL)
1358  elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1359 #endif
1360 
1361  /*
1362  * Check for db name conflict. This is just to give a more friendly error
1363  * message than "unique index violation". There's a race condition but
1364  * we're willing to accept the less friendly message in that case.
1365  */
1366  if (OidIsValid(get_database_oid(dbname, true)))
1367  ereport(ERROR,
1368  (errcode(ERRCODE_DUPLICATE_DATABASE),
1369  errmsg("database \"%s\" already exists", dbname)));
1370 
1371  /*
1372  * The source DB can't have any active backends, except this one
1373  * (exception is to allow CREATE DB while connected to template1).
1374  * Otherwise we might copy inconsistent data.
1375  *
1376  * This should be last among the basic error checks, because it involves
1377  * potential waiting; we may as well throw an error first if we're gonna
1378  * throw one.
1379  */
1380  if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
1381  ereport(ERROR,
1382  (errcode(ERRCODE_OBJECT_IN_USE),
1383  errmsg("source database \"%s\" is being accessed by other users",
1384  dbtemplate),
1385  errdetail_busy_db(notherbackends, npreparedxacts)));
1386 
1387  /*
1388  * Select an OID for the new database, checking that it doesn't have a
1389  * filename conflict with anything already existing in the tablespace
1390  * directories.
1391  */
1392  pg_database_rel = table_open(DatabaseRelationId, RowExclusiveLock);
1393 
1394  /*
1395  * If database OID is configured, check if the OID is already in use or
1396  * data directory already exists.
1397  */
1398  if (OidIsValid(dboid))
1399  {
1400  char *existing_dbname = get_database_name(dboid);
1401 
1402  if (existing_dbname != NULL)
1403  ereport(ERROR,
1404  (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1405  errmsg("database OID %u is already in use by database \"%s\"",
1406  dboid, existing_dbname));
1407 
1408  if (check_db_file_conflict(dboid))
1409  ereport(ERROR,
1410  (errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
1411  errmsg("data directory with the specified OID %u already exists", dboid));
1412  }
1413  else
1414  {
1415  /* Select an OID for the new database if is not explicitly configured. */
1416  do
1417  {
1418  dboid = GetNewOidWithIndex(pg_database_rel, DatabaseOidIndexId,
1419  Anum_pg_database_oid);
1420  } while (check_db_file_conflict(dboid));
1421  }
1422 
1423  /*
1424  * Insert a new tuple into pg_database. This establishes our ownership of
1425  * the new database name (anyone else trying to insert the same name will
1426  * block on the unique index, and fail after we commit).
1427  */
1428 
1429  Assert((dblocprovider != COLLPROVIDER_LIBC && dblocale) ||
1430  (dblocprovider == COLLPROVIDER_LIBC && !dblocale));
1431 
1432  /* Form tuple */
1433  new_record[Anum_pg_database_oid - 1] = ObjectIdGetDatum(dboid);
1434  new_record[Anum_pg_database_datname - 1] =
1436  new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
1437  new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
1438  new_record[Anum_pg_database_datlocprovider - 1] = CharGetDatum(dblocprovider);
1439  new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1440  new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1441  new_record[Anum_pg_database_dathasloginevt - 1] = BoolGetDatum(src_hasloginevt);
1442  new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1443  new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
1444  new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
1445  new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
1446  new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
1447  new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
1448  if (dblocale)
1449  new_record[Anum_pg_database_datlocale - 1] = CStringGetTextDatum(dblocale);
1450  else
1451  new_record_nulls[Anum_pg_database_datlocale - 1] = true;
1452  if (dbicurules)
1453  new_record[Anum_pg_database_daticurules - 1] = CStringGetTextDatum(dbicurules);
1454  else
1455  new_record_nulls[Anum_pg_database_daticurules - 1] = true;
1456  if (dbcollversion)
1457  new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
1458  else
1459  new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
1460 
1461  /*
1462  * We deliberately set datacl to default (NULL), rather than copying it
1463  * from the template database. Copying it would be a bad idea when the
1464  * owner is not the same as the template's owner.
1465  */
1466  new_record_nulls[Anum_pg_database_datacl - 1] = true;
1467 
1468  tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
1469  new_record, new_record_nulls);
1470 
1471  CatalogTupleInsert(pg_database_rel, tuple);
1472 
1473  /*
1474  * Now generate additional catalog entries associated with the new DB
1475  */
1476 
1477  /* Register owner dependency */
1478  recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
1479 
1480  /* Create pg_shdepend entries for objects within database */
1481  copyTemplateDependencies(src_dboid, dboid);
1482 
1483  /* Post creation hook for new database */
1484  InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
1485 
1486  /*
1487  * If we're going to be reading data for the to-be-created database into
1488  * shared_buffers, take a lock on it. Nobody should know that this
1489  * database exists yet, but it's good to maintain the invariant that an
1490  * AccessExclusiveLock on the database is sufficient to drop all of its
1491  * buffers without worrying about more being read later.
1492  *
1493  * Note that we need to do this before entering the
1494  * PG_ENSURE_ERROR_CLEANUP block below, because createdb_failure_callback
1495  * expects this lock to be held already.
1496  */
1497  if (dbstrategy == CREATEDB_WAL_LOG)
1498  LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
1499 
1500  /*
1501  * Once we start copying subdirectories, we need to be able to clean 'em
1502  * up if we fail. Use an ENSURE block to make sure this happens. (This
1503  * is not a 100% solution, because of the possibility of failure during
1504  * transaction commit after we leave this routine, but it should handle
1505  * most scenarios.)
1506  */
1507  fparms.src_dboid = src_dboid;
1508  fparms.dest_dboid = dboid;
1509  fparms.strategy = dbstrategy;
1510 
1512  PointerGetDatum(&fparms));
1513  {
1514  /*
1515  * If the user has asked to create a database with WAL_LOG strategy
1516  * then call CreateDatabaseUsingWalLog, which will copy the database
1517  * at the block level and it will WAL log each copied block.
1518  * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
1519  * database file by file.
1520  */
1521  if (dbstrategy == CREATEDB_WAL_LOG)
1522  CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
1523  dst_deftablespace);
1524  else
1525  CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
1526  dst_deftablespace);
1527 
1528  /*
1529  * Close pg_database, but keep lock till commit.
1530  */
1531  table_close(pg_database_rel, NoLock);
1532 
1533  /*
1534  * Force synchronous commit, thus minimizing the window between
1535  * creation of the database files and committal of the transaction. If
1536  * we crash before committing, we'll have a DB that's taking up disk
1537  * space but is not in pg_database, which is not good.
1538  */
1539  ForceSyncCommit();
1540  }
1542  PointerGetDatum(&fparms));
1543 
1544  return dboid;
1545 }
1546 
1547 /*
1548  * Check whether chosen encoding matches chosen locale settings. This
1549  * restriction is necessary because libc's locale-specific code usually
1550  * fails when presented with data in an encoding it's not expecting. We
1551  * allow mismatch in four cases:
1552  *
1553  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
1554  * which works with any encoding.
1555  *
1556  * 2. locale encoding = -1, which means that we couldn't determine the
1557  * locale's encoding and have to trust the user to get it right.
1558  *
1559  * 3. selected encoding is UTF8 and platform is win32. This is because
1560  * UTF8 is a pseudo codepage that is supported in all locales since it's
1561  * converted to UTF16 before being used.
1562  *
1563  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
1564  * is risky but we have historically allowed it --- notably, the
1565  * regression tests require it.
1566  *
1567  * Note: if you change this policy, fix initdb to match.
1568  */
1569 void
1570 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
1571 {
1572  int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
1573  int collate_encoding = pg_get_encoding_from_locale(collate, true);
1574 
1575  if (!(ctype_encoding == encoding ||
1576  ctype_encoding == PG_SQL_ASCII ||
1577  ctype_encoding == -1 ||
1578 #ifdef WIN32
1579  encoding == PG_UTF8 ||
1580 #endif
1581  (encoding == PG_SQL_ASCII && superuser())))
1582  ereport(ERROR,
1583  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1584  errmsg("encoding \"%s\" does not match locale \"%s\"",
1586  ctype),
1587  errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
1588  pg_encoding_to_char(ctype_encoding))));
1589 
1590  if (!(collate_encoding == encoding ||
1591  collate_encoding == PG_SQL_ASCII ||
1592  collate_encoding == -1 ||
1593 #ifdef WIN32
1594  encoding == PG_UTF8 ||
1595 #endif
1596  (encoding == PG_SQL_ASCII && superuser())))
1597  ereport(ERROR,
1598  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1599  errmsg("encoding \"%s\" does not match locale \"%s\"",
1601  collate),
1602  errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
1603  pg_encoding_to_char(collate_encoding))));
1604 }
1605 
1606 /* Error cleanup callback for createdb */
1607 static void
1609 {
1611 
1612  /*
1613  * If we were copying database at block levels then drop pages for the
1614  * destination database that are in the shared buffer cache. And tell
1615  * checkpointer to forget any pending fsync and unlink requests for files
1616  * in the database. The reasoning behind doing this is same as explained
1617  * in dropdb function. But unlike dropdb we don't need to call
1618  * pgstat_drop_database because this database is still not created so
1619  * there should not be any stat for this.
1620  */
1621  if (fparms->strategy == CREATEDB_WAL_LOG)
1622  {
1625 
1626  /* Release lock on the target database. */
1627  UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
1628  AccessShareLock);
1629  }
1630 
1631  /*
1632  * Release lock on source database before doing recursive remove. This is
1633  * not essential but it seems desirable to release the lock as soon as
1634  * possible.
1635  */
1636  UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
1637 
1638  /* Throw away any successfully copied subdirectories */
1640 }
1641 
1642 
1643 /*
1644  * DROP DATABASE
1645  */
1646 void
1647 dropdb(const char *dbname, bool missing_ok, bool force)
1648 {
1649  Oid db_id;
1650  bool db_istemplate;
1651  Relation pgdbrel;
1652  HeapTuple tup;
1653  ScanKeyData scankey;
1654  SysScanDesc scan;
1655  Form_pg_database datform;
1656  int notherbackends;
1657  int npreparedxacts;
1658  int nslots,
1659  nslots_active;
1660  int nsubscriptions;
1661 
1662  /*
1663  * Look up the target database's OID, and get exclusive lock on it. We
1664  * need this to ensure that no new backend starts up in the target
1665  * database while we are deleting it (see postinit.c), and that no one is
1666  * using it as a CREATE DATABASE template or trying to delete it for
1667  * themselves.
1668  */
1669  pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
1670 
1671  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1672  &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1673  {
1674  if (!missing_ok)
1675  {
1676  ereport(ERROR,
1677  (errcode(ERRCODE_UNDEFINED_DATABASE),
1678  errmsg("database \"%s\" does not exist", dbname)));
1679  }
1680  else
1681  {
1682  /* Close pg_database, release the lock, since we changed nothing */
1683  table_close(pgdbrel, RowExclusiveLock);
1684  ereport(NOTICE,
1685  (errmsg("database \"%s\" does not exist, skipping",
1686  dbname)));
1687  return;
1688  }
1689  }
1690 
1691  /*
1692  * Permission checks
1693  */
1694  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1696  dbname);
1697 
1698  /* DROP hook for the database being removed */
1699  InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
1700 
1701  /*
1702  * Disallow dropping a DB that is marked istemplate. This is just to
1703  * prevent people from accidentally dropping template0 or template1; they
1704  * can do so if they're really determined ...
1705  */
1706  if (db_istemplate)
1707  ereport(ERROR,
1708  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1709  errmsg("cannot drop a template database")));
1710 
1711  /* Obviously can't drop my own database */
1712  if (db_id == MyDatabaseId)
1713  ereport(ERROR,
1714  (errcode(ERRCODE_OBJECT_IN_USE),
1715  errmsg("cannot drop the currently open database")));
1716 
1717  /*
1718  * Check whether there are active logical slots that refer to the
1719  * to-be-dropped database. The database lock we are holding prevents the
1720  * creation of new slots using the database or existing slots becoming
1721  * active.
1722  */
1723  (void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
1724  if (nslots_active)
1725  {
1726  ereport(ERROR,
1727  (errcode(ERRCODE_OBJECT_IN_USE),
1728  errmsg("database \"%s\" is used by an active logical replication slot",
1729  dbname),
1730  errdetail_plural("There is %d active slot.",
1731  "There are %d active slots.",
1732  nslots_active, nslots_active)));
1733  }
1734 
1735  /*
1736  * Check if there are subscriptions defined in the target database.
1737  *
1738  * We can't drop them automatically because they might be holding
1739  * resources in other databases/instances.
1740  */
1741  if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
1742  ereport(ERROR,
1743  (errcode(ERRCODE_OBJECT_IN_USE),
1744  errmsg("database \"%s\" is being used by logical replication subscription",
1745  dbname),
1746  errdetail_plural("There is %d subscription.",
1747  "There are %d subscriptions.",
1748  nsubscriptions, nsubscriptions)));
1749 
1750 
1751  /*
1752  * Attempt to terminate all existing connections to the target database if
1753  * the user has requested to do so.
1754  */
1755  if (force)
1756  TerminateOtherDBBackends(db_id);
1757 
1758  /*
1759  * Check for other backends in the target database. (Because we hold the
1760  * database lock, no new ones can start after this.)
1761  *
1762  * As in CREATE DATABASE, check this after other error conditions.
1763  */
1764  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1765  ereport(ERROR,
1766  (errcode(ERRCODE_OBJECT_IN_USE),
1767  errmsg("database \"%s\" is being accessed by other users",
1768  dbname),
1769  errdetail_busy_db(notherbackends, npreparedxacts)));
1770 
1771  /*
1772  * Delete any comments or security labels associated with the database.
1773  */
1774  DeleteSharedComments(db_id, DatabaseRelationId);
1775  DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
1776 
1777  /*
1778  * Remove settings associated with this database
1779  */
1780  DropSetting(db_id, InvalidOid);
1781 
1782  /*
1783  * Remove shared dependency references for the database.
1784  */
1785  dropDatabaseDependencies(db_id);
1786 
1787  /*
1788  * Tell the cumulative stats system to forget it immediately, too.
1789  */
1790  pgstat_drop_database(db_id);
1791 
1792  /*
1793  * Get the pg_database tuple to scribble on. Note that this does not
1794  * directly rely on the syscache to avoid issues with flattened toast
1795  * values for the in-place update.
1796  */
1797  ScanKeyInit(&scankey,
1798  Anum_pg_database_datname,
1799  BTEqualStrategyNumber, F_NAMEEQ,
1801 
1802  scan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1803  NULL, 1, &scankey);
1804 
1805  tup = systable_getnext(scan);
1806  if (!HeapTupleIsValid(tup))
1807  elog(ERROR, "cache lookup failed for database %u", db_id);
1808  datform = (Form_pg_database) GETSTRUCT(tup);
1809 
1810  /*
1811  * Except for the deletion of the catalog row, subsequent actions are not
1812  * transactional (consider DropDatabaseBuffers() discarding modified
1813  * buffers). But we might crash or get interrupted below. To prevent
1814  * accesses to a database with invalid contents, mark the database as
1815  * invalid using an in-place update.
1816  *
1817  * We need to flush the WAL before continuing, to guarantee the
1818  * modification is durable before performing irreversible filesystem
1819  * operations.
1820  */
1821  datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
1822  heap_inplace_update(pgdbrel, tup);
1824 
1825  /*
1826  * Also delete the tuple - transactionally. If this transaction commits,
1827  * the row will be gone, but if we fail, dropdb() can be invoked again.
1828  */
1829  CatalogTupleDelete(pgdbrel, &tup->t_self);
1830 
1831  systable_endscan(scan);
1832 
1833  /*
1834  * Drop db-specific replication slots.
1835  */
1837 
1838  /*
1839  * Drop pages for this database that are in the shared buffer cache. This
1840  * is important to ensure that no remaining backend tries to write out a
1841  * dirty buffer to the dead database later...
1842  */
1843  DropDatabaseBuffers(db_id);
1844 
1845  /*
1846  * Tell checkpointer to forget any pending fsync and unlink requests for
1847  * files in the database; else the fsyncs will fail at next checkpoint, or
1848  * worse, it will delete files that belong to a newly created database
1849  * with the same OID.
1850  */
1852 
1853  /*
1854  * Force a checkpoint to make sure the checkpointer has received the
1855  * message sent by ForgetDatabaseSyncRequests.
1856  */
1858 
1859  /* Close all smgr fds in all backends. */
1861 
1862  /*
1863  * Remove all tablespace subdirs belonging to the database.
1864  */
1865  remove_dbtablespaces(db_id);
1866 
1867  /*
1868  * Close pg_database, but keep lock till commit.
1869  */
1870  table_close(pgdbrel, NoLock);
1871 
1872  /*
1873  * Force synchronous commit, thus minimizing the window between removal of
1874  * the database files and committal of the transaction. If we crash before
1875  * committing, we'll have a DB that's gone on disk but still there
1876  * according to pg_database, which is not good.
1877  */
1878  ForceSyncCommit();
1879 }
1880 
1881 
1882 /*
1883  * Rename database
1884  */
1886 RenameDatabase(const char *oldname, const char *newname)
1887 {
1888  Oid db_id;
1889  HeapTuple newtup;
1890  Relation rel;
1891  int notherbackends;
1892  int npreparedxacts;
1893  ObjectAddress address;
1894 
1895  /*
1896  * Look up the target database's OID, and get exclusive lock on it. We
1897  * need this for the same reasons as DROP DATABASE.
1898  */
1899  rel = table_open(DatabaseRelationId, RowExclusiveLock);
1900 
1901  if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
1902  NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
1903  ereport(ERROR,
1904  (errcode(ERRCODE_UNDEFINED_DATABASE),
1905  errmsg("database \"%s\" does not exist", oldname)));
1906 
1907  /* must be owner */
1908  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
1910  oldname);
1911 
1912  /* must have createdb rights */
1913  if (!have_createdb_privilege())
1914  ereport(ERROR,
1915  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1916  errmsg("permission denied to rename database")));
1917 
1918  /*
1919  * If built with appropriate switch, whine when regression-testing
1920  * conventions for database names are violated.
1921  */
1922 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1923  if (strstr(newname, "regression") == NULL)
1924  elog(WARNING, "databases created by regression test cases should have names including \"regression\"");
1925 #endif
1926 
1927  /*
1928  * Make sure the new name doesn't exist. See notes for same error in
1929  * CREATE DATABASE.
1930  */
1931  if (OidIsValid(get_database_oid(newname, true)))
1932  ereport(ERROR,
1933  (errcode(ERRCODE_DUPLICATE_DATABASE),
1934  errmsg("database \"%s\" already exists", newname)));
1935 
1936  /*
1937  * XXX Client applications probably store the current database somewhere,
1938  * so renaming it could cause confusion. On the other hand, there may not
1939  * be an actual problem besides a little confusion, so think about this
1940  * and decide.
1941  */
1942  if (db_id == MyDatabaseId)
1943  ereport(ERROR,
1944  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1945  errmsg("current database cannot be renamed")));
1946 
1947  /*
1948  * Make sure the database does not have active sessions. This is the same
1949  * concern as above, but applied to other sessions.
1950  *
1951  * As in CREATE DATABASE, check this after other error conditions.
1952  */
1953  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1954  ereport(ERROR,
1955  (errcode(ERRCODE_OBJECT_IN_USE),
1956  errmsg("database \"%s\" is being accessed by other users",
1957  oldname),
1958  errdetail_busy_db(notherbackends, npreparedxacts)));
1959 
1960  /* rename */
1961  newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1962  if (!HeapTupleIsValid(newtup))
1963  elog(ERROR, "cache lookup failed for database %u", db_id);
1964  namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1965  CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1966 
1967  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1968 
1969  ObjectAddressSet(address, DatabaseRelationId, db_id);
1970 
1971  /*
1972  * Close pg_database, but keep lock till commit.
1973  */
1974  table_close(rel, NoLock);
1975 
1976  return address;
1977 }
1978 
1979 
1980 /*
1981  * ALTER DATABASE SET TABLESPACE
1982  */
1983 static void
1984 movedb(const char *dbname, const char *tblspcname)
1985 {
1986  Oid db_id;
1987  Relation pgdbrel;
1988  int notherbackends;
1989  int npreparedxacts;
1990  HeapTuple oldtuple,
1991  newtuple;
1992  Oid src_tblspcoid,
1993  dst_tblspcoid;
1994  ScanKeyData scankey;
1995  SysScanDesc sysscan;
1996  AclResult aclresult;
1997  char *src_dbpath;
1998  char *dst_dbpath;
1999  DIR *dstdir;
2000  struct dirent *xlde;
2001  movedb_failure_params fparms;
2002 
2003  /*
2004  * Look up the target database's OID, and get exclusive lock on it. We
2005  * need this to ensure that no new backend starts up in the database while
2006  * we are moving it, and that no one is using it as a CREATE DATABASE
2007  * template or trying to delete it.
2008  */
2009  pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
2010 
2011  if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, NULL,
2012  NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL, NULL, NULL, NULL))
2013  ereport(ERROR,
2014  (errcode(ERRCODE_UNDEFINED_DATABASE),
2015  errmsg("database \"%s\" does not exist", dbname)));
2016 
2017  /*
2018  * We actually need a session lock, so that the lock will persist across
2019  * the commit/restart below. (We could almost get away with letting the
2020  * lock be released at commit, except that someone could try to move
2021  * relations of the DB back into the old directory while we rmtree() it.)
2022  */
2023  LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2025 
2026  /*
2027  * Permission checks
2028  */
2029  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2031  dbname);
2032 
2033  /*
2034  * Obviously can't move the tables of my own database
2035  */
2036  if (db_id == MyDatabaseId)
2037  ereport(ERROR,
2038  (errcode(ERRCODE_OBJECT_IN_USE),
2039  errmsg("cannot change the tablespace of the currently open database")));
2040 
2041  /*
2042  * Get tablespace's oid
2043  */
2044  dst_tblspcoid = get_tablespace_oid(tblspcname, false);
2045 
2046  /*
2047  * Permission checks
2048  */
2049  aclresult = object_aclcheck(TableSpaceRelationId, dst_tblspcoid, GetUserId(),
2050  ACL_CREATE);
2051  if (aclresult != ACLCHECK_OK)
2052  aclcheck_error(aclresult, OBJECT_TABLESPACE,
2053  tblspcname);
2054 
2055  /*
2056  * pg_global must never be the default tablespace
2057  */
2058  if (dst_tblspcoid == GLOBALTABLESPACE_OID)
2059  ereport(ERROR,
2060  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2061  errmsg("pg_global cannot be used as default tablespace")));
2062 
2063  /*
2064  * No-op if same tablespace
2065  */
2066  if (src_tblspcoid == dst_tblspcoid)
2067  {
2068  table_close(pgdbrel, NoLock);
2069  UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2071  return;
2072  }
2073 
2074  /*
2075  * Check for other backends in the target database. (Because we hold the
2076  * database lock, no new ones can start after this.)
2077  *
2078  * As in CREATE DATABASE, check this after other error conditions.
2079  */
2080  if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
2081  ereport(ERROR,
2082  (errcode(ERRCODE_OBJECT_IN_USE),
2083  errmsg("database \"%s\" is being accessed by other users",
2084  dbname),
2085  errdetail_busy_db(notherbackends, npreparedxacts)));
2086 
2087  /*
2088  * Get old and new database paths
2089  */
2090  src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
2091  dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
2092 
2093  /*
2094  * Force a checkpoint before proceeding. This will force all dirty
2095  * buffers, including those of unlogged tables, out to disk, to ensure
2096  * source database is up-to-date on disk for the copy.
2097  * FlushDatabaseBuffers() would suffice for that, but we also want to
2098  * process any pending unlink requests. Otherwise, the check for existing
2099  * files in the target directory might fail unnecessarily, not to mention
2100  * that the copy might fail due to source files getting deleted under it.
2101  * On Windows, this also ensures that background procs don't hold any open
2102  * files, which would cause rmdir() to fail.
2103  */
2106 
2107  /* Close all smgr fds in all backends. */
2109 
2110  /*
2111  * Now drop all buffers holding data of the target database; they should
2112  * no longer be dirty so DropDatabaseBuffers is safe.
2113  *
2114  * It might seem that we could just let these buffers age out of shared
2115  * buffers naturally, since they should not get referenced anymore. The
2116  * problem with that is that if the user later moves the database back to
2117  * its original tablespace, any still-surviving buffers would appear to
2118  * contain valid data again --- but they'd be missing any changes made in
2119  * the database while it was in the new tablespace. In any case, freeing
2120  * buffers that should never be used again seems worth the cycles.
2121  *
2122  * Note: it'd be sufficient to get rid of buffers matching db_id and
2123  * src_tblspcoid, but bufmgr.c presently provides no API for that.
2124  */
2125  DropDatabaseBuffers(db_id);
2126 
2127  /*
2128  * Check for existence of files in the target directory, i.e., objects of
2129  * this database that are already in the target tablespace. We can't
2130  * allow the move in such a case, because we would need to change those
2131  * relations' pg_class.reltablespace entries to zero, and we don't have
2132  * access to the DB's pg_class to do so.
2133  */
2134  dstdir = AllocateDir(dst_dbpath);
2135  if (dstdir != NULL)
2136  {
2137  while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
2138  {
2139  if (strcmp(xlde->d_name, ".") == 0 ||
2140  strcmp(xlde->d_name, "..") == 0)
2141  continue;
2142 
2143  ereport(ERROR,
2144  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2145  errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
2146  dbname, tblspcname),
2147  errhint("You must move them back to the database's default tablespace before using this command.")));
2148  }
2149 
2150  FreeDir(dstdir);
2151 
2152  /*
2153  * The directory exists but is empty. We must remove it before using
2154  * the copydir function.
2155  */
2156  if (rmdir(dst_dbpath) != 0)
2157  elog(ERROR, "could not remove directory \"%s\": %m",
2158  dst_dbpath);
2159  }
2160 
2161  /*
2162  * Use an ENSURE block to make sure we remove the debris if the copy fails
2163  * (eg, due to out-of-disk-space). This is not a 100% solution, because
2164  * of the possibility of failure during transaction commit, but it should
2165  * handle most scenarios.
2166  */
2167  fparms.dest_dboid = db_id;
2168  fparms.dest_tsoid = dst_tblspcoid;
2170  PointerGetDatum(&fparms));
2171  {
2172  Datum new_record[Natts_pg_database] = {0};
2173  bool new_record_nulls[Natts_pg_database] = {0};
2174  bool new_record_repl[Natts_pg_database] = {0};
2175 
2176  /*
2177  * Copy files from the old tablespace to the new one
2178  */
2179  copydir(src_dbpath, dst_dbpath, false);
2180 
2181  /*
2182  * Record the filesystem change in XLOG
2183  */
2184  {
2186 
2187  xlrec.db_id = db_id;
2188  xlrec.tablespace_id = dst_tblspcoid;
2189  xlrec.src_db_id = db_id;
2190  xlrec.src_tablespace_id = src_tblspcoid;
2191 
2192  XLogBeginInsert();
2193  XLogRegisterData((char *) &xlrec,
2195 
2196  (void) XLogInsert(RM_DBASE_ID,
2198  }
2199 
2200  /*
2201  * Update the database's pg_database tuple
2202  */
2203  ScanKeyInit(&scankey,
2204  Anum_pg_database_datname,
2205  BTEqualStrategyNumber, F_NAMEEQ,
2207  sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
2208  NULL, 1, &scankey);
2209  oldtuple = systable_getnext(sysscan);
2210  if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
2211  ereport(ERROR,
2212  (errcode(ERRCODE_UNDEFINED_DATABASE),
2213  errmsg("database \"%s\" does not exist", dbname)));
2214 
2215  new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
2216  new_record_repl[Anum_pg_database_dattablespace - 1] = true;
2217 
2218  newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
2219  new_record,
2220  new_record_nulls, new_record_repl);
2221  CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
2222 
2223  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2224 
2225  systable_endscan(sysscan);
2226 
2227  /*
2228  * Force another checkpoint here. As in CREATE DATABASE, this is to
2229  * ensure that we don't have to replay a committed
2230  * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
2231  * any unlogged operations done in the new DB tablespace before the
2232  * next checkpoint.
2233  */
2235 
2236  /*
2237  * Force synchronous commit, thus minimizing the window between
2238  * copying the database files and committal of the transaction. If we
2239  * crash before committing, we'll leave an orphaned set of files on
2240  * disk, which is not fatal but not good either.
2241  */
2242  ForceSyncCommit();
2243 
2244  /*
2245  * Close pg_database, but keep lock till commit.
2246  */
2247  table_close(pgdbrel, NoLock);
2248  }
2250  PointerGetDatum(&fparms));
2251 
2252  /*
2253  * Commit the transaction so that the pg_database update is committed. If
2254  * we crash while removing files, the database won't be corrupt, we'll
2255  * just leave some orphaned files in the old directory.
2256  *
2257  * (This is OK because we know we aren't inside a transaction block.)
2258  *
2259  * XXX would it be safe/better to do this inside the ensure block? Not
2260  * convinced it's a good idea; consider elog just after the transaction
2261  * really commits.
2262  */
2265 
2266  /* Start new transaction for the remaining work; don't need a snapshot */
2268 
2269  /*
2270  * Remove files from the old tablespace
2271  */
2272  if (!rmtree(src_dbpath, true))
2273  ereport(WARNING,
2274  (errmsg("some useless files may be left behind in old database directory \"%s\"",
2275  src_dbpath)));
2276 
2277  /*
2278  * Record the filesystem change in XLOG
2279  */
2280  {
2281  xl_dbase_drop_rec xlrec;
2282 
2283  xlrec.db_id = db_id;
2284  xlrec.ntablespaces = 1;
2285 
2286  XLogBeginInsert();
2287  XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
2288  XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid));
2289 
2290  (void) XLogInsert(RM_DBASE_ID,
2292  }
2293 
2294  /* Now it's safe to release the database lock */
2295  UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
2297 
2298  pfree(src_dbpath);
2299  pfree(dst_dbpath);
2300 }
2301 
2302 /* Error cleanup callback for movedb */
2303 static void
2305 {
2307  char *dstpath;
2308 
2309  /* Get rid of anything we managed to copy to the target directory */
2310  dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
2311 
2312  (void) rmtree(dstpath, true);
2313 
2314  pfree(dstpath);
2315 }
2316 
2317 /*
2318  * Process options and call dropdb function.
2319  */
2320 void
2322 {
2323  bool force = false;
2324  ListCell *lc;
2325 
2326  foreach(lc, stmt->options)
2327  {
2328  DefElem *opt = (DefElem *) lfirst(lc);
2329 
2330  if (strcmp(opt->defname, "force") == 0)
2331  force = true;
2332  else
2333  ereport(ERROR,
2334  (errcode(ERRCODE_SYNTAX_ERROR),
2335  errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
2336  parser_errposition(pstate, opt->location)));
2337  }
2338 
2339  dropdb(stmt->dbname, stmt->missing_ok, force);
2340 }
2341 
2342 /*
2343  * ALTER DATABASE name ...
2344  */
2345 Oid
2346 AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2347 {
2348  Relation rel;
2349  Oid dboid;
2350  HeapTuple tuple,
2351  newtuple;
2352  Form_pg_database datform;
2353  ScanKeyData scankey;
2354  SysScanDesc scan;
2355  ListCell *option;
2356  bool dbistemplate = false;
2357  bool dballowconnections = true;
2358  int dbconnlimit = DATCONNLIMIT_UNLIMITED;
2359  DefElem *distemplate = NULL;
2360  DefElem *dallowconnections = NULL;
2361  DefElem *dconnlimit = NULL;
2362  DefElem *dtablespace = NULL;
2363  Datum new_record[Natts_pg_database] = {0};
2364  bool new_record_nulls[Natts_pg_database] = {0};
2365  bool new_record_repl[Natts_pg_database] = {0};
2366 
2367  /* Extract options from the statement node tree */
2368  foreach(option, stmt->options)
2369  {
2370  DefElem *defel = (DefElem *) lfirst(option);
2371 
2372  if (strcmp(defel->defname, "is_template") == 0)
2373  {
2374  if (distemplate)
2375  errorConflictingDefElem(defel, pstate);
2376  distemplate = defel;
2377  }
2378  else if (strcmp(defel->defname, "allow_connections") == 0)
2379  {
2380  if (dallowconnections)
2381  errorConflictingDefElem(defel, pstate);
2382  dallowconnections = defel;
2383  }
2384  else if (strcmp(defel->defname, "connection_limit") == 0)
2385  {
2386  if (dconnlimit)
2387  errorConflictingDefElem(defel, pstate);
2388  dconnlimit = defel;
2389  }
2390  else if (strcmp(defel->defname, "tablespace") == 0)
2391  {
2392  if (dtablespace)
2393  errorConflictingDefElem(defel, pstate);
2394  dtablespace = defel;
2395  }
2396  else
2397  ereport(ERROR,
2398  (errcode(ERRCODE_SYNTAX_ERROR),
2399  errmsg("option \"%s\" not recognized", defel->defname),
2400  parser_errposition(pstate, defel->location)));
2401  }
2402 
2403  if (dtablespace)
2404  {
2405  /*
2406  * While the SET TABLESPACE syntax doesn't allow any other options,
2407  * somebody could write "WITH TABLESPACE ...". Forbid any other
2408  * options from being specified in that case.
2409  */
2410  if (list_length(stmt->options) != 1)
2411  ereport(ERROR,
2412  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2413  errmsg("option \"%s\" cannot be specified with other options",
2414  dtablespace->defname),
2415  parser_errposition(pstate, dtablespace->location)));
2416  /* this case isn't allowed within a transaction block */
2417  PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
2418  movedb(stmt->dbname, defGetString(dtablespace));
2419  return InvalidOid;
2420  }
2421 
2422  if (distemplate && distemplate->arg)
2423  dbistemplate = defGetBoolean(distemplate);
2424  if (dallowconnections && dallowconnections->arg)
2425  dballowconnections = defGetBoolean(dallowconnections);
2426  if (dconnlimit && dconnlimit->arg)
2427  {
2428  dbconnlimit = defGetInt32(dconnlimit);
2429  if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
2430  ereport(ERROR,
2431  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2432  errmsg("invalid connection limit: %d", dbconnlimit)));
2433  }
2434 
2435  /*
2436  * Get the old tuple. We don't need a lock on the database per se,
2437  * because we're not going to do anything that would mess up incoming
2438  * connections.
2439  */
2440  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2441  ScanKeyInit(&scankey,
2442  Anum_pg_database_datname,
2443  BTEqualStrategyNumber, F_NAMEEQ,
2444  CStringGetDatum(stmt->dbname));
2445  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2446  NULL, 1, &scankey);
2447  tuple = systable_getnext(scan);
2448  if (!HeapTupleIsValid(tuple))
2449  ereport(ERROR,
2450  (errcode(ERRCODE_UNDEFINED_DATABASE),
2451  errmsg("database \"%s\" does not exist", stmt->dbname)));
2452 
2453  datform = (Form_pg_database) GETSTRUCT(tuple);
2454  dboid = datform->oid;
2455 
2456  if (database_is_invalid_form(datform))
2457  {
2458  ereport(FATAL,
2459  errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2460  errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
2461  errhint("Use DROP DATABASE to drop invalid databases."));
2462  }
2463 
2464  if (!object_ownercheck(DatabaseRelationId, dboid, GetUserId()))
2466  stmt->dbname);
2467 
2468  /*
2469  * In order to avoid getting locked out and having to go through
2470  * standalone mode, we refuse to disallow connections to the database
2471  * we're currently connected to. Lockout can still happen with concurrent
2472  * sessions but the likeliness of that is not high enough to worry about.
2473  */
2474  if (!dballowconnections && dboid == MyDatabaseId)
2475  ereport(ERROR,
2476  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2477  errmsg("cannot disallow connections for current database")));
2478 
2479  /*
2480  * Build an updated tuple, perusing the information just obtained
2481  */
2482  if (distemplate)
2483  {
2484  new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
2485  new_record_repl[Anum_pg_database_datistemplate - 1] = true;
2486  }
2487  if (dallowconnections)
2488  {
2489  new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
2490  new_record_repl[Anum_pg_database_datallowconn - 1] = true;
2491  }
2492  if (dconnlimit)
2493  {
2494  new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
2495  new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
2496  }
2497 
2498  newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
2499  new_record_nulls, new_record_repl);
2500  CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
2501 
2502  InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
2503 
2504  systable_endscan(scan);
2505 
2506  /* Close pg_database, but keep lock till commit */
2507  table_close(rel, NoLock);
2508 
2509  return dboid;
2510 }
2511 
2512 
2513 /*
2514  * ALTER DATABASE name REFRESH COLLATION VERSION
2515  */
2518 {
2519  Relation rel;
2520  ScanKeyData scankey;
2521  SysScanDesc scan;
2522  Oid db_id;
2523  HeapTuple tuple;
2524  Form_pg_database datForm;
2525  ObjectAddress address;
2526  Datum datum;
2527  bool isnull;
2528  char *oldversion;
2529  char *newversion;
2530 
2531  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2532  ScanKeyInit(&scankey,
2533  Anum_pg_database_datname,
2534  BTEqualStrategyNumber, F_NAMEEQ,
2535  CStringGetDatum(stmt->dbname));
2536  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2537  NULL, 1, &scankey);
2538  tuple = systable_getnext(scan);
2539  if (!HeapTupleIsValid(tuple))
2540  ereport(ERROR,
2541  (errcode(ERRCODE_UNDEFINED_DATABASE),
2542  errmsg("database \"%s\" does not exist", stmt->dbname)));
2543 
2544  datForm = (Form_pg_database) GETSTRUCT(tuple);
2545  db_id = datForm->oid;
2546 
2547  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2549  stmt->dbname);
2550 
2551  datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
2552  oldversion = isnull ? NULL : TextDatumGetCString(datum);
2553 
2554  if (datForm->datlocprovider == COLLPROVIDER_LIBC)
2555  {
2556  datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
2557  if (isnull)
2558  elog(ERROR, "unexpected null in pg_database");
2559  }
2560  else
2561  {
2562  datum = heap_getattr(tuple, Anum_pg_database_datlocale, RelationGetDescr(rel), &isnull);
2563  if (isnull)
2564  elog(ERROR, "unexpected null in pg_database");
2565  }
2566 
2567  newversion = get_collation_actual_version(datForm->datlocprovider,
2568  TextDatumGetCString(datum));
2569 
2570  /* cannot change from NULL to non-NULL or vice versa */
2571  if ((!oldversion && newversion) || (oldversion && !newversion))
2572  elog(ERROR, "invalid collation version change");
2573  else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
2574  {
2575  bool nulls[Natts_pg_database] = {0};
2576  bool replaces[Natts_pg_database] = {0};
2577  Datum values[Natts_pg_database] = {0};
2578 
2579  ereport(NOTICE,
2580  (errmsg("changing version from %s to %s",
2581  oldversion, newversion)));
2582 
2583  values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
2584  replaces[Anum_pg_database_datcollversion - 1] = true;
2585 
2586  tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
2587  values, nulls, replaces);
2588  CatalogTupleUpdate(rel, &tuple->t_self, tuple);
2589  heap_freetuple(tuple);
2590  }
2591  else
2592  ereport(NOTICE,
2593  (errmsg("version has not changed")));
2594 
2595  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2596 
2597  ObjectAddressSet(address, DatabaseRelationId, db_id);
2598 
2599  systable_endscan(scan);
2600 
2601  table_close(rel, NoLock);
2602 
2603  return address;
2604 }
2605 
2606 
2607 /*
2608  * ALTER DATABASE name SET ...
2609  */
2610 Oid
2612 {
2613  Oid datid = get_database_oid(stmt->dbname, false);
2614 
2615  /*
2616  * Obtain a lock on the database and make sure it didn't go away in the
2617  * meantime.
2618  */
2619  shdepLockAndCheckObject(DatabaseRelationId, datid);
2620 
2621  if (!object_ownercheck(DatabaseRelationId, datid, GetUserId()))
2623  stmt->dbname);
2624 
2625  AlterSetting(datid, InvalidOid, stmt->setstmt);
2626 
2627  UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
2628 
2629  return datid;
2630 }
2631 
2632 
2633 /*
2634  * ALTER DATABASE name OWNER TO newowner
2635  */
2637 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
2638 {
2639  Oid db_id;
2640  HeapTuple tuple;
2641  Relation rel;
2642  ScanKeyData scankey;
2643  SysScanDesc scan;
2644  Form_pg_database datForm;
2645  ObjectAddress address;
2646 
2647  /*
2648  * Get the old tuple. We don't need a lock on the database per se,
2649  * because we're not going to do anything that would mess up incoming
2650  * connections.
2651  */
2652  rel = table_open(DatabaseRelationId, RowExclusiveLock);
2653  ScanKeyInit(&scankey,
2654  Anum_pg_database_datname,
2655  BTEqualStrategyNumber, F_NAMEEQ,
2657  scan = systable_beginscan(rel, DatabaseNameIndexId, true,
2658  NULL, 1, &scankey);
2659  tuple = systable_getnext(scan);
2660  if (!HeapTupleIsValid(tuple))
2661  ereport(ERROR,
2662  (errcode(ERRCODE_UNDEFINED_DATABASE),
2663  errmsg("database \"%s\" does not exist", dbname)));
2664 
2665  datForm = (Form_pg_database) GETSTRUCT(tuple);
2666  db_id = datForm->oid;
2667 
2668  /*
2669  * If the new owner is the same as the existing owner, consider the
2670  * command to have succeeded. This is to be consistent with other
2671  * objects.
2672  */
2673  if (datForm->datdba != newOwnerId)
2674  {
2675  Datum repl_val[Natts_pg_database];
2676  bool repl_null[Natts_pg_database] = {0};
2677  bool repl_repl[Natts_pg_database] = {0};
2678  Acl *newAcl;
2679  Datum aclDatum;
2680  bool isNull;
2681  HeapTuple newtuple;
2682 
2683  /* Otherwise, must be owner of the existing object */
2684  if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
2686  dbname);
2687 
2688  /* Must be able to become new owner */
2689  check_can_set_role(GetUserId(), newOwnerId);
2690 
2691  /*
2692  * must have createdb rights
2693  *
2694  * NOTE: This is different from other alter-owner checks in that the
2695  * current user is checked for createdb privileges instead of the
2696  * destination owner. This is consistent with the CREATE case for
2697  * databases. Because superusers will always have this right, we need
2698  * no special case for them.
2699  */
2700  if (!have_createdb_privilege())
2701  ereport(ERROR,
2702  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2703  errmsg("permission denied to change owner of database")));
2704 
2705  repl_repl[Anum_pg_database_datdba - 1] = true;
2706  repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
2707 
2708  /*
2709  * Determine the modified ACL for the new owner. This is only
2710  * necessary when the ACL is non-null.
2711  */
2712  aclDatum = heap_getattr(tuple,
2713  Anum_pg_database_datacl,
2714  RelationGetDescr(rel),
2715  &isNull);
2716  if (!isNull)
2717  {
2718  newAcl = aclnewowner(DatumGetAclP(aclDatum),
2719  datForm->datdba, newOwnerId);
2720  repl_repl[Anum_pg_database_datacl - 1] = true;
2721  repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
2722  }
2723 
2724  newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
2725  CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
2726 
2727  heap_freetuple(newtuple);
2728 
2729  /* Update owner dependency reference */
2730  changeDependencyOnOwner(DatabaseRelationId, db_id, newOwnerId);
2731  }
2732 
2733  InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
2734 
2735  ObjectAddressSet(address, DatabaseRelationId, db_id);
2736 
2737  systable_endscan(scan);
2738 
2739  /* Close pg_database, but keep lock till commit */
2740  table_close(rel, NoLock);
2741 
2742  return address;
2743 }
2744 
2745 
2746 Datum
2748 {
2749  Oid dbid = PG_GETARG_OID(0);
2750  HeapTuple tp;
2751  char datlocprovider;
2752  Datum datum;
2753  char *version;
2754 
2755  tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2756  if (!HeapTupleIsValid(tp))
2757  ereport(ERROR,
2758  (errcode(ERRCODE_UNDEFINED_OBJECT),
2759  errmsg("database with OID %u does not exist", dbid)));
2760 
2762 
2763  if (datlocprovider == COLLPROVIDER_LIBC)
2764  datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datcollate);
2765  else
2766  datum = SysCacheGetAttrNotNull(DATABASEOID, tp, Anum_pg_database_datlocale);
2767 
2769  TextDatumGetCString(datum));
2770 
2771  ReleaseSysCache(tp);
2772 
2773  if (version)
2775  else
2776  PG_RETURN_NULL();
2777 }
2778 
2779 
2780 /*
2781  * Helper functions
2782  */
2783 
2784 /*
2785  * Look up info about the database named "name". If the database exists,
2786  * obtain the specified lock type on it, fill in any of the remaining
2787  * parameters that aren't NULL, and return true. If no such database,
2788  * return false.
2789  */
2790 static bool
2791 get_db_info(const char *name, LOCKMODE lockmode,
2792  Oid *dbIdP, Oid *ownerIdP,
2793  int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP,
2794  TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
2795  Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale,
2796  char **dbIcurules,
2797  char *dbLocProvider,
2798  char **dbCollversion)
2799 {
2800  bool result = false;
2801  Relation relation;
2802 
2803  Assert(name);
2804 
2805  /* Caller may wish to grab a better lock on pg_database beforehand... */
2806  relation = table_open(DatabaseRelationId, AccessShareLock);
2807 
2808  /*
2809  * Loop covers the rare case where the database is renamed before we can
2810  * lock it. We try again just in case we can find a new one of the same
2811  * name.
2812  */
2813  for (;;)
2814  {
2815  ScanKeyData scanKey;
2816  SysScanDesc scan;
2817  HeapTuple tuple;
2818  Oid dbOid;
2819 
2820  /*
2821  * there's no syscache for database-indexed-by-name, so must do it the
2822  * hard way
2823  */
2824  ScanKeyInit(&scanKey,
2825  Anum_pg_database_datname,
2826  BTEqualStrategyNumber, F_NAMEEQ,
2828 
2829  scan = systable_beginscan(relation, DatabaseNameIndexId, true,
2830  NULL, 1, &scanKey);
2831 
2832  tuple = systable_getnext(scan);
2833 
2834  if (!HeapTupleIsValid(tuple))
2835  {
2836  /* definitely no database of that name */
2837  systable_endscan(scan);
2838  break;
2839  }
2840 
2841  dbOid = ((Form_pg_database) GETSTRUCT(tuple))->oid;
2842 
2843  systable_endscan(scan);
2844 
2845  /*
2846  * Now that we have a database OID, we can try to lock the DB.
2847  */
2848  if (lockmode != NoLock)
2849  LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2850 
2851  /*
2852  * And now, re-fetch the tuple by OID. If it's still there and still
2853  * the same name, we win; else, drop the lock and loop back to try
2854  * again.
2855  */
2856  tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
2857  if (HeapTupleIsValid(tuple))
2858  {
2859  Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
2860 
2861  if (strcmp(name, NameStr(dbform->datname)) == 0)
2862  {
2863  Datum datum;
2864  bool isnull;
2865 
2866  /* oid of the database */
2867  if (dbIdP)
2868  *dbIdP = dbOid;
2869  /* oid of the owner */
2870  if (ownerIdP)
2871  *ownerIdP = dbform->datdba;
2872  /* character encoding */
2873  if (encodingP)
2874  *encodingP = dbform->encoding;
2875  /* allowed as template? */
2876  if (dbIsTemplateP)
2877  *dbIsTemplateP = dbform->datistemplate;
2878  /* Has on login event trigger? */
2879  if (dbHasLoginEvtP)
2880  *dbHasLoginEvtP = dbform->dathasloginevt;
2881  /* allowing connections? */
2882  if (dbAllowConnP)
2883  *dbAllowConnP = dbform->datallowconn;
2884  /* limit of frozen XIDs */
2885  if (dbFrozenXidP)
2886  *dbFrozenXidP = dbform->datfrozenxid;
2887  /* minimum MultiXactId */
2888  if (dbMinMultiP)
2889  *dbMinMultiP = dbform->datminmxid;
2890  /* default tablespace for this database */
2891  if (dbTablespace)
2892  *dbTablespace = dbform->dattablespace;
2893  /* default locale settings for this database */
2894  if (dbLocProvider)
2895  *dbLocProvider = dbform->datlocprovider;
2896  if (dbCollate)
2897  {
2898  datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datcollate);
2899  *dbCollate = TextDatumGetCString(datum);
2900  }
2901  if (dbCtype)
2902  {
2903  datum = SysCacheGetAttrNotNull(DATABASEOID, tuple, Anum_pg_database_datctype);
2904  *dbCtype = TextDatumGetCString(datum);
2905  }
2906  if (dbLocale)
2907  {
2908  datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datlocale, &isnull);
2909  if (isnull)
2910  *dbLocale = NULL;
2911  else
2912  *dbLocale = TextDatumGetCString(datum);
2913  }
2914  if (dbIcurules)
2915  {
2916  datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_daticurules, &isnull);
2917  if (isnull)
2918  *dbIcurules = NULL;
2919  else
2920  *dbIcurules = TextDatumGetCString(datum);
2921  }
2922  if (dbCollversion)
2923  {
2924  datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
2925  if (isnull)
2926  *dbCollversion = NULL;
2927  else
2928  *dbCollversion = TextDatumGetCString(datum);
2929  }
2930  ReleaseSysCache(tuple);
2931  result = true;
2932  break;
2933  }
2934  /* can only get here if it was just renamed */
2935  ReleaseSysCache(tuple);
2936  }
2937 
2938  if (lockmode != NoLock)
2939  UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
2940  }
2941 
2942  table_close(relation, AccessShareLock);
2943 
2944  return result;
2945 }
2946 
2947 /* Check if current user has createdb privileges */
2948 bool
2950 {
2951  bool result = false;
2952  HeapTuple utup;
2953 
2954  /* Superusers can always do everything */
2955  if (superuser())
2956  return true;
2957 
2958  utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
2959  if (HeapTupleIsValid(utup))
2960  {
2961  result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
2962  ReleaseSysCache(utup);
2963  }
2964  return result;
2965 }
2966 
2967 /*
2968  * Remove tablespace directories
2969  *
2970  * We don't know what tablespaces db_id is using, so iterate through all
2971  * tablespaces removing <tablespace>/db_id
2972  */
2973 static void
2975 {
2976  Relation rel;
2977  TableScanDesc scan;
2978  HeapTuple tuple;
2979  List *ltblspc = NIL;
2980  ListCell *cell;
2981  int ntblspc;
2982  int i;
2983  Oid *tablespace_ids;
2984 
2985  rel = table_open(TableSpaceRelationId, AccessShareLock);
2986  scan = table_beginscan_catalog(rel, 0, NULL);
2987  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2988  {
2989  Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
2990  Oid dsttablespace = spcform->oid;
2991  char *dstpath;
2992  struct stat st;
2993 
2994  /* Don't mess with the global tablespace */
2995  if (dsttablespace == GLOBALTABLESPACE_OID)
2996  continue;
2997 
2998  dstpath = GetDatabasePath(db_id, dsttablespace);
2999 
3000  if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
3001  {
3002  /* Assume we can ignore it */
3003  pfree(dstpath);
3004  continue;
3005  }
3006 
3007  if (!rmtree(dstpath, true))
3008  ereport(WARNING,
3009  (errmsg("some useless files may be left behind in old database directory \"%s\"",
3010  dstpath)));
3011 
3012  ltblspc = lappend_oid(ltblspc, dsttablespace);
3013  pfree(dstpath);
3014  }
3015 
3016  ntblspc = list_length(ltblspc);
3017  if (ntblspc == 0)
3018  {
3019  table_endscan(scan);
3021  return;
3022  }
3023 
3024  tablespace_ids = (Oid *) palloc(ntblspc * sizeof(Oid));
3025  i = 0;
3026  foreach(cell, ltblspc)
3027  tablespace_ids[i++] = lfirst_oid(cell);
3028 
3029  /* Record the filesystem change in XLOG */
3030  {
3031  xl_dbase_drop_rec xlrec;
3032 
3033  xlrec.db_id = db_id;
3034  xlrec.ntablespaces = ntblspc;
3035 
3036  XLogBeginInsert();
3037  XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec);
3038  XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid));
3039 
3040  (void) XLogInsert(RM_DBASE_ID,
3042  }
3043 
3044  list_free(ltblspc);
3045  pfree(tablespace_ids);
3046 
3047  table_endscan(scan);
3049 }
3050 
3051 /*
3052  * Check for existing files that conflict with a proposed new DB OID;
3053  * return true if there are any
3054  *
3055  * If there were a subdirectory in any tablespace matching the proposed new
3056  * OID, we'd get a create failure due to the duplicate name ... and then we'd
3057  * try to remove that already-existing subdirectory during the cleanup in
3058  * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
3059  * instead we make this extra check before settling on the OID of the new
3060  * database. This exactly parallels what GetNewRelFileNumber() does for table
3061  * relfilenumber values.
3062  */
3063 static bool
3065 {
3066  bool result = false;
3067  Relation rel;
3068  TableScanDesc scan;
3069  HeapTuple tuple;
3070 
3071  rel = table_open(TableSpaceRelationId, AccessShareLock);
3072  scan = table_beginscan_catalog(rel, 0, NULL);
3073  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
3074  {
3075  Form_pg_tablespace spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
3076  Oid dsttablespace = spcform->oid;
3077  char *dstpath;
3078  struct stat st;
3079 
3080  /* Don't mess with the global tablespace */
3081  if (dsttablespace == GLOBALTABLESPACE_OID)
3082  continue;
3083 
3084  dstpath = GetDatabasePath(db_id, dsttablespace);
3085 
3086  if (lstat(dstpath, &st) == 0)
3087  {
3088  /* Found a conflicting file (or directory, whatever) */
3089  pfree(dstpath);
3090  result = true;
3091  break;
3092  }
3093 
3094  pfree(dstpath);
3095  }
3096 
3097  table_endscan(scan);
3099 
3100  return result;
3101 }
3102 
3103 /*
3104  * Issue a suitable errdetail message for a busy database
3105  */
3106 static int
3107 errdetail_busy_db(int notherbackends, int npreparedxacts)
3108 {
3109  if (notherbackends > 0 && npreparedxacts > 0)
3110 
3111  /*
3112  * We don't deal with singular versus plural here, since gettext
3113  * doesn't support multiple plurals in one string.
3114  */
3115  errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
3116  notherbackends, npreparedxacts);
3117  else if (notherbackends > 0)
3118  errdetail_plural("There is %d other session using the database.",
3119  "There are %d other sessions using the database.",
3120  notherbackends,
3121  notherbackends);
3122  else
3123  errdetail_plural("There is %d prepared transaction using the database.",
3124  "There are %d prepared transactions using the database.",
3125  npreparedxacts,
3126  npreparedxacts);
3127  return 0; /* just to keep ereport macro happy */
3128 }
3129 
3130 /*
3131  * get_database_oid - given a database name, look up the OID
3132  *
3133  * If missing_ok is false, throw an error if database name not found. If
3134  * true, just return InvalidOid.
3135  */
3136 Oid
3137 get_database_oid(const char *dbname, bool missing_ok)
3138 {
3139  Relation pg_database;
3140  ScanKeyData entry[1];
3141  SysScanDesc scan;
3142  HeapTuple dbtuple;
3143  Oid oid;
3144 
3145  /*
3146  * There's no syscache for pg_database indexed by name, so we must look
3147  * the hard way.
3148  */
3149  pg_database = table_open(DatabaseRelationId, AccessShareLock);
3150  ScanKeyInit(&entry[0],
3151  Anum_pg_database_datname,
3152  BTEqualStrategyNumber, F_NAMEEQ,
3154  scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
3155  NULL, 1, entry);
3156 
3157  dbtuple = systable_getnext(scan);
3158 
3159  /* We assume that there can be at most one matching tuple */
3160  if (HeapTupleIsValid(dbtuple))
3161  oid = ((Form_pg_database) GETSTRUCT(dbtuple))->oid;
3162  else
3163  oid = InvalidOid;
3164 
3165  systable_endscan(scan);
3166  table_close(pg_database, AccessShareLock);
3167 
3168  if (!OidIsValid(oid) && !missing_ok)
3169  ereport(ERROR,
3170  (errcode(ERRCODE_UNDEFINED_DATABASE),
3171  errmsg("database \"%s\" does not exist",
3172  dbname)));
3173 
3174  return oid;
3175 }
3176 
3177 
3178 /*
3179  * get_database_name - given a database OID, look up the name
3180  *
3181  * Returns a palloc'd string, or NULL if no such database.
3182  */
3183 char *
3185 {
3186  HeapTuple dbtuple;
3187  char *result;
3188 
3189  dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
3190  if (HeapTupleIsValid(dbtuple))
3191  {
3192  result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
3193  ReleaseSysCache(dbtuple);
3194  }
3195  else
3196  result = NULL;
3197 
3198  return result;
3199 }
3200 
3201 
3202 /*
3203  * While dropping a database the pg_database row is marked invalid, but the
3204  * catalog contents still exist. Connections to such a database are not
3205  * allowed.
3206  */
3207 bool
3209 {
3210  return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
3211 }
3212 
3213 
3214 /*
3215  * Convenience wrapper around database_is_invalid_form()
3216  */
3217 bool
3219 {
3220  HeapTuple dbtup;
3221  Form_pg_database dbform;
3222  bool invalid;
3223 
3224  dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
3225  if (!HeapTupleIsValid(dbtup))
3226  elog(ERROR, "cache lookup failed for database %u", dboid);
3227  dbform = (Form_pg_database) GETSTRUCT(dbtup);
3228 
3230 
3231  ReleaseSysCache(dbtup);
3232 
3233  return invalid;
3234 }
3235 
3236 
3237 /*
3238  * recovery_create_dbdir()
3239  *
3240  * During recovery, there's a case where we validly need to recover a missing
3241  * tablespace directory so that recovery can continue. This happens when
3242  * recovery wants to create a database but the holding tablespace has been
3243  * removed before the server stopped. Since we expect that the directory will
3244  * be gone before reaching recovery consistency, and we have no knowledge about
3245  * the tablespace other than its OID here, we create a real directory under
3246  * pg_tblspc here instead of restoring the symlink.
3247  *
3248  * If only_tblspc is true, then the requested directory must be in pg_tblspc/
3249  */
3250 static void
3251 recovery_create_dbdir(char *path, bool only_tblspc)
3252 {
3253  struct stat st;
3254 
3256 
3257  if (stat(path, &st) == 0)
3258  return;
3259 
3260  if (only_tblspc && strstr(path, PG_TBLSPC_DIR_SLASH) == NULL)
3261  elog(PANIC, "requested to created invalid directory: %s", path);
3262 
3264  ereport(PANIC,
3265  errmsg("missing directory \"%s\"", path));
3266 
3268  "creating missing directory: %s", path);
3269 
3270  if (pg_mkdir_p(path, pg_dir_create_mode) != 0)
3271  ereport(PANIC,
3272  errmsg("could not create missing directory \"%s\": %m", path));
3273 }
3274 
3275 
3276 /*
3277  * DATABASE resource manager's routines
3278  */
3279 void
3281 {
3282  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3283 
3284  /* Backup blocks are not used in dbase records */
3285  Assert(!XLogRecHasAnyBlockRefs(record));
3286 
3287  if (info == XLOG_DBASE_CREATE_FILE_COPY)
3288  {
3291  char *src_path;
3292  char *dst_path;
3293  char *parent_path;
3294  struct stat st;
3295 
3296  src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
3297  dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3298 
3299  /*
3300  * Our theory for replaying a CREATE is to forcibly drop the target
3301  * subdirectory if present, then re-copy the source data. This may be
3302  * more work than needed, but it is simple to implement.
3303  */
3304  if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
3305  {
3306  if (!rmtree(dst_path, true))
3307  /* If this failed, copydir() below is going to error. */
3308  ereport(WARNING,
3309  (errmsg("some useless files may be left behind in old database directory \"%s\"",
3310  dst_path)));
3311  }
3312 
3313  /*
3314  * If the parent of the target path doesn't exist, create it now. This
3315  * enables us to create the target underneath later.
3316  */
3317  parent_path = pstrdup(dst_path);
3318  get_parent_directory(parent_path);
3319  if (stat(parent_path, &st) < 0)
3320  {
3321  if (errno != ENOENT)
3322  ereport(FATAL,
3323  errmsg("could not stat directory \"%s\": %m",
3324  dst_path));
3325 
3326  /* create the parent directory if needed and valid */
3327  recovery_create_dbdir(parent_path, true);
3328  }
3329  pfree(parent_path);
3330 
3331  /*
3332  * There's a case where the copy source directory is missing for the
3333  * same reason above. Create the empty source directory so that
3334  * copydir below doesn't fail. The directory will be dropped soon by
3335  * recovery.
3336  */
3337  if (stat(src_path, &st) < 0 && errno == ENOENT)
3338  recovery_create_dbdir(src_path, false);
3339 
3340  /*
3341  * Force dirty buffers out to disk, to ensure source database is
3342  * up-to-date for the copy.
3343  */
3345 
3346  /* Close all smgr fds in all backends. */
3348 
3349  /*
3350  * Copy this subdirectory to the new location
3351  *
3352  * We don't need to copy subdirectories
3353  */
3354  copydir(src_path, dst_path, false);
3355 
3356  pfree(src_path);
3357  pfree(dst_path);
3358  }
3359  else if (info == XLOG_DBASE_CREATE_WAL_LOG)
3360  {
3363  char *dbpath;
3364  char *parent_path;
3365 
3366  dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
3367 
3368  /* create the parent directory if needed and valid */
3369  parent_path = pstrdup(dbpath);
3370  get_parent_directory(parent_path);
3371  recovery_create_dbdir(parent_path, true);
3372 
3373  /* Create the database directory with the version file. */
3374  CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
3375  true);
3376  pfree(dbpath);
3377  }
3378  else if (info == XLOG_DBASE_DROP)
3379  {
3380  xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
3381  char *dst_path;
3382  int i;
3383 
3384  if (InHotStandby)
3385  {
3386  /*
3387  * Lock database while we resolve conflicts to ensure that
3388  * InitPostgres() cannot fully re-execute concurrently. This
3389  * avoids backends re-connecting automatically to same database,
3390  * which can happen in some cases.
3391  *
3392  * This will lock out walsenders trying to connect to db-specific
3393  * slots for logical decoding too, so it's safe for us to drop
3394  * slots.
3395  */
3396  LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3398  }
3399 
3400  /* Drop any database-specific replication slots */
3402 
3403  /* Drop pages for this database that are in the shared buffer cache */
3404  DropDatabaseBuffers(xlrec->db_id);
3405 
3406  /* Also, clean out any fsync requests that might be pending in md.c */
3408 
3409  /* Clean out the xlog relcache too */
3410  XLogDropDatabase(xlrec->db_id);
3411 
3412  /* Close all smgr fds in all backends. */
3414 
3415  for (i = 0; i < xlrec->ntablespaces; i++)
3416  {
3417  dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_ids[i]);
3418 
3419  /* And remove the physical files */
3420  if (!rmtree(dst_path, true))
3421  ereport(WARNING,
3422  (errmsg("some useless files may be left behind in old database directory \"%s\"",
3423  dst_path)));
3424  pfree(dst_path);
3425  }
3426 
3427  if (InHotStandby)
3428  {
3429  /*
3430  * Release locks prior to commit. XXX There is a race condition
3431  * here that may allow backends to reconnect, but the window for
3432  * this is small because the gap between here and commit is mostly
3433  * fairly small and it is unlikely that people will be dropping
3434  * databases that we are trying to connect to anyway.
3435  */
3436  UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
3437  }
3438  }
3439  else
3440  elog(PANIC, "dbase_redo: unknown op code %u", info);
3441 }
Acl * aclnewowner(const Acl *old_acl, Oid oldOwnerId, Oid newOwnerId)
Definition: acl.c:1103
Oid get_role_oid(const char *rolname, bool missing_ok)
Definition: acl.c:5554
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5325
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
#define DatumGetAclP(X)
Definition: acl.h:120
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2698
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3886
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4140
bool directory_is_empty(const char *path)
Definition: tablespace.c:853
Oid get_tablespace_oid(const char *tablespacename, bool missing_ok)
Definition: tablespace.c:1426
bool allow_in_place_tablespaces
Definition: tablespace.c:85
uint32 BlockNumber
Definition: block.h:31
static Datum values[MAXATTR]
Definition: bootstrap.c:150
int Buffer
Definition: buf.h:23
void DropDatabaseBuffers(Oid dbid)
Definition: bufmgr.c:4368
BlockNumber BufferGetBlockNumber(Buffer buffer)
Definition: bufmgr.c:3706
void CreateAndCopyRelationData(RelFileLocator src_rlocator, RelFileLocator dst_rlocator, bool permanent)
Definition: bufmgr.c:4780
void UnlockReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:4923
void LockBuffer(Buffer buffer, int mode)
Definition: bufmgr.c:5140
Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent)
Definition: bufmgr.c:830
void FlushDatabaseBuffers(Oid dbid)
Definition: bufmgr.c:4844
@ BAS_BULKREAD
Definition: bufmgr.h:36
#define BUFFER_LOCK_SHARE
Definition: bufmgr.h:190
static Page BufferGetPage(Buffer buffer)
Definition: bufmgr.h:400
@ RBM_NORMAL
Definition: bufmgr.h:45
static bool PageIsEmpty(Page page)
Definition: bufpage.h:223
Pointer Page
Definition: bufpage.h:81
static Item PageGetItem(Page page, ItemId itemId)
Definition: bufpage.h:354
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
Definition: bufpage.h:243
static bool PageIsNew(Page page)
Definition: bufpage.h:233
static OffsetNumber PageGetMaxOffsetNumber(Page page)
Definition: bufpage.h:372
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:746
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY
Definition: c.h:1273
TransactionId MultiXactId
Definition: c.h:662
unsigned char uint8
Definition: c.h:504
uint32 TransactionId
Definition: c.h:652
#define OidIsValid(objectId)
Definition: c.h:775
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:412
void RequestCheckpoint(int flags)
Definition: checkpointer.c:941
void DeleteSharedComments(Oid oid, Oid classoid)
Definition: comment.c:374
void copydir(const char *fromdir, const char *todir, bool recurse)
Definition: copydir.c:37
static void remove_dbtablespaces(Oid db_id)
Definition: dbcommands.c:2974
static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
Definition: dbcommands.c:456
ObjectAddress AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
Definition: dbcommands.c:2517
bool have_createdb_privilege(void)
Definition: dbcommands.c:2949
ObjectAddress RenameDatabase(const char *oldname, const char *newname)
Definition: dbcommands.c:1886
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3184
Oid get_database_oid(const char *dbname, bool missing_ok)
Definition: dbcommands.c:3137
CreateDBStrategy
Definition: dbcommands.c:83
@ CREATEDB_FILE_COPY
Definition: dbcommands.c:85
@ CREATEDB_WAL_LOG
Definition: dbcommands.c:84
static void movedb_failure_callback(int code, Datum arg)
Definition: dbcommands.c:2304
static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
Definition: dbcommands.c:148
static List * ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
Definition: dbcommands.c:250
static void recovery_create_dbdir(char *path, bool only_tblspc)
Definition: dbcommands.c:3251
ObjectAddress AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
Definition: dbcommands.c:2637
bool database_is_invalid_form(Form_pg_database datform)
Definition: dbcommands.c:3208
Datum pg_database_collation_actual_version(PG_FUNCTION_ARGS)
Definition: dbcommands.c:2747
static void movedb(const char *dbname, const char *tblspcname)
Definition: dbcommands.c:1984
static List * ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid, char *srcpath, List *rlocatorlist, Snapshot snapshot)
Definition: dbcommands.c:328
void check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
Definition: dbcommands.c:1570
Oid createdb(ParseState *pstate, const CreatedbStmt *stmt)
Definition: dbcommands.c:683
Oid AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
Definition: dbcommands.c:2346
static int errdetail_busy_db(int notherbackends, int npreparedxacts)
Definition: dbcommands.c:3107
static CreateDBRelInfo * ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, char *srcpath)
Definition: dbcommands.c:391
static bool check_db_file_conflict(Oid db_id)
Definition: dbcommands.c:3064
struct CreateDBRelInfo CreateDBRelInfo
void dbase_redo(XLogReaderState *record)
Definition: dbcommands.c:3280
static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid, Oid dst_tsid)
Definition: dbcommands.c:550
void DropDatabase(ParseState *pstate, DropdbStmt *stmt)
Definition: dbcommands.c:2321
void dropdb(const char *dbname, bool missing_ok, bool force)
Definition: dbcommands.c:1647
Oid AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
Definition: dbcommands.c:2611
static bool get_db_info(const char *name, LOCKMODE lockmode, Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, bool *dbHasLoginEvtP, TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP, Oid *dbTablespace, char **dbCollate, char **dbCtype, char **dbLocale, char **dbIcurules, char *dbLocProvider, char **dbCollversion)
Definition: dbcommands.c:2791
static void createdb_failure_callback(int code, Datum arg)
Definition: dbcommands.c:1608
bool database_is_invalid_oid(Oid dboid)
Definition: dbcommands.c:3218
#define XLOG_DBASE_CREATE_WAL_LOG
#define XLOG_DBASE_DROP
#define MinSizeOfDbaseDropRec
#define XLOG_DBASE_CREATE_FILE_COPY
int32 defGetInt32(DefElem *def)
Definition: define.c:162
bool defGetBoolean(DefElem *def)
Definition: define.c:107
char * defGetString(DefElem *def)
Definition: define.c:48
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:384
Oid defGetObjectId(DefElem *def)
Definition: define.c:219
int errcode_for_file_access(void)
Definition: elog.c:876
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1295
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define NOTICE
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:149
bool is_encoding_supported_by_icu(int encoding)
Definition: encnames.c:461
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2932
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3937
int FreeDir(DIR *dir)
Definition: fd.c:2984
int CloseTransientFile(int fd)
Definition: fd.c:2832
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
int data_sync_elevel(int elevel)
Definition: fd.c:3960
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2656
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2866
static char dstpath[MAXPGPATH]
Definition: file_ops.c:32
int pg_dir_create_mode
Definition: file_perm.c:18
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:641
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_RETURN_TEXT_P(x)
Definition: fmgr.h:372
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
BufferAccessStrategy GetAccessStrategy(BufferAccessStrategyType btype)
Definition: freelist.c:541
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:602
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:509
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:385
bool IsBinaryUpgrade
Definition: globals.c:120
bool IsUnderPostmaster
Definition: globals.c:119
bool allowSystemTableMods
Definition: globals.c:129
Oid MyDatabaseId
Definition: globals.c:93
void heap_inplace_update(Relation relation, HeapTuple tuple)
Definition: heapam.c:6045
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1234
bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:792
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
static char * locale
Definition: initdb.c:140
#define write(a, b, c)
Definition: win32.h:14
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:47
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
Definition: ipc.h:52
invalidindex index d is invalid
Definition: isn.c:134
int i
Definition: isn.c:73
#define ItemIdGetLength(itemId)
Definition: itemid.h:59
#define ItemIdIsNormal(itemId)
Definition: itemid.h:99
#define ItemIdIsDead(itemId)
Definition: itemid.h:113
#define ItemIdIsUsed(itemId)
Definition: itemid.h:92
#define ItemIdIsRedirected(itemId)
Definition: itemid.h:106
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
Definition: itemptr.h:135
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
void list_free(List *list)
Definition: list.c:1546
void list_free_deep(List *list)
Definition: list.c:1560
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1073
void UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1171
void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
Definition: lmgr.c:212
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1132
void LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1153
void LockRelationId(LockRelId *relid, LOCKMODE lockmode)
Definition: lmgr.c:184
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareLock
Definition: lockdefs.h:40
#define RowExclusiveLock
Definition: lockdefs.h:38
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc(Size size)
Definition: mcxt.c:1317
void ForgetDatabaseSyncRequests(Oid dbid)
Definition: md.c:1428
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
Oid GetUserId(void)
Definition: miscinit.c:514
#define InvalidMultiXactId
Definition: multixact.h:24
void namestrcpy(Name name, const char *str)
Definition: name.c:233
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:182
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define OffsetNumberNext(offsetNumber)
Definition: off.h:52
uint16 OffsetNumber
Definition: off.h:24
#define FirstOffsetNumber
Definition: off.h:27
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
@ OBJECT_TABLESPACE
Definition: parsenodes.h:2299
@ OBJECT_DATABASE
Definition: parsenodes.h:2266
#define ACL_CREATE
Definition: parsenodes.h:85
FormData_pg_authid * Form_pg_authid
Definition: pg_authid.h:56
bool rolcreatedb
Definition: pg_authid.h:38
void * arg
FormData_pg_class * Form_pg_class
Definition: pg_class.h:153
#define MAXPGPATH
char datlocprovider
Definition: pg_database.h:44
FormData_pg_database * Form_pg_database
Definition: pg_database.h:96
NameData datname
Definition: pg_database.h:35
#define DATCONNLIMIT_INVALID_DB
Definition: pg_database.h:124
int32 encoding
Definition: pg_database.h:41
#define DATCONNLIMIT_UNLIMITED
Definition: pg_database.h:117
void DropSetting(Oid databaseid, Oid roleid)
void AlterSetting(Oid databaseid, Oid roleid, VariableSetStmt *setstmt)
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define lfirst_oid(lc)
Definition: pg_list.h:174
int icu_validation_level
Definition: pg_locale.c:103
void icu_validate_locale(const char *loc_str)
Definition: pg_locale.c:2909
char * get_collation_actual_version(char collprovider, const char *collcollate)
Definition: pg_locale.c:1659
char * icu_language_tag(const char *loc_str, int elevel)
Definition: pg_locale.c:2851
const char * builtin_validate_locale(int encoding, const char *locale)
Definition: pg_locale.c:2471
bool check_locale(int category, const char *locale, char **canonname)
Definition: pg_locale.c:339
void dropDatabaseDependencies(Oid databaseId)
Definition: pg_shdepend.c:999
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316
void copyTemplateDependencies(Oid templateDbId, Oid newDbId)
Definition: pg_shdepend.c:895
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
void shdepLockAndCheckObject(Oid classId, Oid objectId)
Definition: pg_shdepend.c:1211
int CountDBSubscriptions(Oid dbid)
FormData_pg_tablespace * Form_pg_tablespace
Definition: pg_tablespace.h:48
static char * buf
Definition: pg_test_fsync.c:73
@ PG_SQL_ASCII
Definition: pg_wchar.h:226
@ PG_UTF8
Definition: pg_wchar.h:232
#define PG_VALID_BE_ENCODING(_enc)
Definition: pg_wchar.h:281
#define pg_encoding_to_char
Definition: pg_wchar.h:630
#define pg_valid_server_encoding
Definition: pg_wchar.h:631
void pgstat_drop_database(Oid databaseid)
int pg_mkdir_p(char *path, int omode)
Definition: pgmkdirp.c:57
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define sprintf
Definition: port.h:240
void get_parent_directory(char *path)
Definition: path.c:991
#define snprintf
Definition: port.h:238
int pg_get_encoding_from_locale(const char *ctype, bool write_message)
Definition: chklocale.c:305
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static Datum TransactionIdGetDatum(TransactionId X)
Definition: postgres.h:272
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void TerminateOtherDBBackends(Oid databaseId)
Definition: procarray.c:3832
bool CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
Definition: procarray.c:3754
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
void WaitForProcSignalBarrier(uint64 generation)
Definition: procsignal.c:421
uint64 EmitProcSignalBarrier(ProcSignalBarrierType type)
Definition: procsignal.c:353
@ PROCSIGNAL_BARRIER_SMGRRELEASE
Definition: procsignal.h:56
#define RelationGetDescr(relation)
Definition: rel.h:531
RelFileNumber RelationMapOidToFilenumberForDatabase(char *dbpath, Oid relationId)
Definition: relmapper.c:265
void RelationMapCopy(Oid dbid, Oid tsid, char *srcdbpath, char *dstdbpath)
Definition: relmapper.c:292
char * GetDatabasePath(Oid dbOid, Oid spcOid)
Definition: relpath.c:110
#define PG_TBLSPC_DIR_SLASH
Definition: relpath.h:42
Oid RelFileNumber
Definition: relpath.h:25
@ MAIN_FORKNUM
Definition: relpath.h:58
#define InvalidRelFileNumber
Definition: relpath.h:26
#define RelFileNumberIsValid(relnumber)
Definition: relpath.h:27
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12840
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ ForwardScanDirection
Definition: sdir.h:28
void DeleteSharedSecurityLabel(Oid objectId, Oid classId)
Definition: seclabel.c:491
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1240
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1298
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
Definition: smgr.c:655
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
Definition: smgr.c:198
void smgrclose(SMgrRelation reln)
Definition: smgr.c:320
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:291
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void ResolveRecoveryConflictWithDatabase(Oid dbid)
Definition: standby.c:568
#define BTEqualStrategyNumber
Definition: stratnum.h:31
char * dbname
Definition: streamutil.c:52
RelFileLocator rlocator
Definition: dbcommands.c:106
Definition: dirent.c:26
char * defname
Definition: parsenodes.h:817
ParseLoc location
Definition: parsenodes.h:821
Node * arg
Definition: parsenodes.h:818
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
HeapTupleHeader t_data
Definition: htup.h:68
Oid t_tableOid
Definition: htup.h:66
Definition: value.h:29
Definition: pg_list.h:54
Definition: rel.h:39
Oid relId
Definition: rel.h:40
Oid dbId
Definition: rel.h:41
RelFileNumber relNumber
CreateDBStrategy strategy
Definition: dbcommands.c:92
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
unsigned short st_mode
Definition: win32_port.h:268
Oid tablespace_ids[FLEXIBLE_ARRAY_MEMBER]
bool superuser(void)
Definition: superuser.c:46
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:218
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:479
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:510
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1019
#define InvalidTransactionId
Definition: transam.h:31
#define FirstNormalObjectId
Definition: transam.h:197
text * cstring_to_text(const char *s)
Definition: varlena.c:184
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
const char * name
#define stat
Definition: win32_port.h:284
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3628
void StartTransactionCommand(void)
Definition: xact.c:3039
void ForceSyncCommit(void)
Definition: xact.c:1151
void CommitTransactionCommand(void)
Definition: xact.c:3137
bool RecoveryInProgress(void)
Definition: xlog.c:6333
XLogRecPtr XactLastRecEnd
Definition: xlog.c:253
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2795
#define CHECKPOINT_FLUSH_ALL
Definition: xlog.h:143
#define CHECKPOINT_FORCE
Definition: xlog.h:142
#define CHECKPOINT_WAIT
Definition: xlog.h:145
#define CHECKPOINT_IMMEDIATE
Definition: xlog.h:141
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const char *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLogRecHasAnyBlockRefs(decoder)
Definition: xlogreader.h:417
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
#define XLR_SPECIAL_REL_UPDATE
Definition: xlogrecord.h:82
bool reachedConsistency
Definition: xlogrecovery.c:295
void XLogDropDatabase(Oid dbid)
Definition: xlogutils.c:652
#define InHotStandby
Definition: xlogutils.h:60