PostgreSQL Source Code  git master
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/sysattr.h"
22 #include "access/table.h"
23 #include "access/xact.h"
24 #include "catalog/pg_authid.h"
25 #include "commands/copy.h"
26 #include "commands/defrem.h"
27 #include "executor/executor.h"
28 #include "mb/pg_wchar.h"
29 #include "miscadmin.h"
30 #include "nodes/makefuncs.h"
31 #include "optimizer/optimizer.h"
32 #include "parser/parse_coerce.h"
33 #include "parser/parse_collate.h"
34 #include "parser/parse_expr.h"
35 #include "parser/parse_relation.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "utils/acl.h"
38 #include "utils/builtins.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/rel.h"
42 #include "utils/rls.h"
43 
44 /*
45  * DoCopy executes the SQL COPY statement
46  *
47  * Either unload or reload contents of table <relation>, depending on <from>.
48  * (<from> = true means we are inserting into the table.) In the "TO" case
49  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
50  * or DELETE query.
51  *
52  * If <pipe> is false, transfer is between the table and the file named
53  * <filename>. Otherwise, transfer is between the table and our regular
54  * input/output stream. The latter could be either stdin/stdout or a
55  * socket, depending on whether we're running under Postmaster control.
56  *
57  * Do not allow a Postgres user without the 'pg_read_server_files' or
58  * 'pg_write_server_files' role to read from or write to a file.
59  *
60  * Do not allow the copy if user doesn't have proper permission to access
61  * the table or the specifically requested columns.
62  */
63 void
64 DoCopy(ParseState *pstate, const CopyStmt *stmt,
65  int stmt_location, int stmt_len,
66  uint64 *processed)
67 {
68  bool is_from = stmt->is_from;
69  bool pipe = (stmt->filename == NULL);
70  Relation rel;
71  Oid relid;
72  RawStmt *query = NULL;
73  Node *whereClause = NULL;
74 
75  /*
76  * Disallow COPY to/from file or program except to users with the
77  * appropriate role.
78  */
79  if (!pipe)
80  {
81  if (stmt->is_program)
82  {
83  if (!has_privs_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
84  ereport(ERROR,
85  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
86  errmsg("must be superuser or have privileges of the pg_execute_server_program role to COPY to or from an external program"),
87  errhint("Anyone can COPY to stdout or from stdin. "
88  "psql's \\copy command also works for anyone.")));
89  }
90  else
91  {
92  if (is_from && !has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
93  ereport(ERROR,
94  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
95  errmsg("must be superuser or have privileges of the pg_read_server_files role to COPY from a file"),
96  errhint("Anyone can COPY to stdout or from stdin. "
97  "psql's \\copy command also works for anyone.")));
98 
99  if (!is_from && !has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
100  ereport(ERROR,
101  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
102  errmsg("must be superuser or have privileges of the pg_write_server_files role to COPY to a file"),
103  errhint("Anyone can COPY to stdout or from stdin. "
104  "psql's \\copy command also works for anyone.")));
105  }
106  }
107 
108  if (stmt->relation)
109  {
110  LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
111  ParseNamespaceItem *nsitem;
112  RTEPermissionInfo *perminfo;
113  TupleDesc tupDesc;
114  List *attnums;
115  ListCell *cur;
116 
117  Assert(!stmt->query);
118 
119  /* Open and lock the relation, using the appropriate lock type. */
120  rel = table_openrv(stmt->relation, lockmode);
121 
122  relid = RelationGetRelid(rel);
123 
124  nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
125  NULL, false, false);
126 
127  perminfo = nsitem->p_perminfo;
128  perminfo->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
129 
130  if (stmt->whereClause)
131  {
132  /* add nsitem to query namespace */
133  addNSItemToQuery(pstate, nsitem, false, true, true);
134 
135  /* Transform the raw expression tree */
136  whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
137 
138  /* Make sure it yields a boolean result. */
139  whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
140 
141  /* we have to fix its collations too */
142  assign_expr_collations(pstate, whereClause);
143 
144  whereClause = eval_const_expressions(NULL, whereClause);
145 
146  whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
147  whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
148  }
149 
150  tupDesc = RelationGetDescr(rel);
151  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
152  foreach(cur, attnums)
153  {
154  int attno;
155  Bitmapset **bms;
156 
158  bms = is_from ? &perminfo->insertedCols : &perminfo->selectedCols;
159 
160  *bms = bms_add_member(*bms, attno);
161  }
162  ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);
163 
164  /*
165  * Permission check for row security policies.
166  *
167  * check_enable_rls will ereport(ERROR) if the user has requested
168  * something invalid and will otherwise indicate if we should enable
169  * RLS (returns RLS_ENABLED) or not for this COPY statement.
170  *
171  * If the relation has a row security policy and we are to apply it
172  * then perform a "query" copy and allow the normal query processing
173  * to handle the policies.
174  *
175  * If RLS is not enabled for this, then just fall through to the
176  * normal non-filtering relation handling.
177  */
178  if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
179  {
181  ColumnRef *cr;
182  ResTarget *target;
183  RangeVar *from;
184  List *targetList = NIL;
185 
186  if (is_from)
187  ereport(ERROR,
188  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
189  errmsg("COPY FROM not supported with row-level security"),
190  errhint("Use INSERT statements instead.")));
191 
192  /*
193  * Build target list
194  *
195  * If no columns are specified in the attribute list of the COPY
196  * command, then the target list is 'all' columns. Therefore, '*'
197  * should be used as the target list for the resulting SELECT
198  * statement.
199  *
200  * In the case that columns are specified in the attribute list,
201  * create a ColumnRef and ResTarget for each column and add them
202  * to the target list for the resulting SELECT statement.
203  */
204  if (!stmt->attlist)
205  {
206  cr = makeNode(ColumnRef);
208  cr->location = -1;
209 
210  target = makeNode(ResTarget);
211  target->name = NULL;
212  target->indirection = NIL;
213  target->val = (Node *) cr;
214  target->location = -1;
215 
216  targetList = list_make1(target);
217  }
218  else
219  {
220  ListCell *lc;
221 
222  foreach(lc, stmt->attlist)
223  {
224  /*
225  * Build the ColumnRef for each column. The ColumnRef
226  * 'fields' property is a String node that corresponds to
227  * the column name respectively.
228  */
229  cr = makeNode(ColumnRef);
230  cr->fields = list_make1(lfirst(lc));
231  cr->location = -1;
232 
233  /* Build the ResTarget and add the ColumnRef to it. */
234  target = makeNode(ResTarget);
235  target->name = NULL;
236  target->indirection = NIL;
237  target->val = (Node *) cr;
238  target->location = -1;
239 
240  /* Add each column to the SELECT statement's target list */
241  targetList = lappend(targetList, target);
242  }
243  }
244 
245  /*
246  * Build RangeVar for from clause, fully qualified based on the
247  * relation which we have opened and locked.
248  */
251  -1);
252 
253  /* Build query */
255  select->targetList = targetList;
256  select->fromClause = list_make1(from);
257 
258  query = makeNode(RawStmt);
259  query->stmt = (Node *) select;
260  query->stmt_location = stmt_location;
261  query->stmt_len = stmt_len;
262 
263  /*
264  * Close the relation for now, but keep the lock on it to prevent
265  * changes between now and when we start the query-based COPY.
266  *
267  * We'll reopen it later as part of the query-based COPY.
268  */
269  table_close(rel, NoLock);
270  rel = NULL;
271  }
272  }
273  else
274  {
275  Assert(stmt->query);
276 
277  /* MERGE is allowed by parser, but unimplemented. Reject for now */
278  if (IsA(stmt->query, MergeStmt))
279  ereport(ERROR,
280  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
281  errmsg("MERGE not supported in COPY"));
282 
283  query = makeNode(RawStmt);
284  query->stmt = stmt->query;
285  query->stmt_location = stmt_location;
286  query->stmt_len = stmt_len;
287 
288  relid = InvalidOid;
289  rel = NULL;
290  }
291 
292  if (is_from)
293  {
294  CopyFromState cstate;
295 
296  Assert(rel);
297 
298  /* check read-only transaction and parallel mode */
299  if (XactReadOnly && !rel->rd_islocaltemp)
300  PreventCommandIfReadOnly("COPY FROM");
301 
302  cstate = BeginCopyFrom(pstate, rel, whereClause,
303  stmt->filename, stmt->is_program,
304  NULL, stmt->attlist, stmt->options);
305  *processed = CopyFrom(cstate); /* copy from file to database */
306  EndCopyFrom(cstate);
307  }
308  else
309  {
310  CopyToState cstate;
311 
312  cstate = BeginCopyTo(pstate, rel, query, relid,
313  stmt->filename, stmt->is_program,
314  NULL, stmt->attlist, stmt->options);
315  *processed = DoCopyTo(cstate); /* copy from database to file */
316  EndCopyTo(cstate);
317  }
318 
319  if (rel != NULL)
320  table_close(rel, NoLock);
321 }
322 
323 /*
324  * Extract a CopyHeaderChoice value from a DefElem. This is like
325  * defGetBoolean() but also accepts the special value "match".
326  */
327 static CopyHeaderChoice
328 defGetCopyHeaderChoice(DefElem *def, bool is_from)
329 {
330  /*
331  * If no parameter value given, assume "true" is meant.
332  */
333  if (def->arg == NULL)
334  return COPY_HEADER_TRUE;
335 
336  /*
337  * Allow 0, 1, "true", "false", "on", "off", or "match".
338  */
339  switch (nodeTag(def->arg))
340  {
341  case T_Integer:
342  switch (intVal(def->arg))
343  {
344  case 0:
345  return COPY_HEADER_FALSE;
346  case 1:
347  return COPY_HEADER_TRUE;
348  default:
349  /* otherwise, error out below */
350  break;
351  }
352  break;
353  default:
354  {
355  char *sval = defGetString(def);
356 
357  /*
358  * The set of strings accepted here should match up with the
359  * grammar's opt_boolean_or_string production.
360  */
361  if (pg_strcasecmp(sval, "true") == 0)
362  return COPY_HEADER_TRUE;
363  if (pg_strcasecmp(sval, "false") == 0)
364  return COPY_HEADER_FALSE;
365  if (pg_strcasecmp(sval, "on") == 0)
366  return COPY_HEADER_TRUE;
367  if (pg_strcasecmp(sval, "off") == 0)
368  return COPY_HEADER_FALSE;
369  if (pg_strcasecmp(sval, "match") == 0)
370  {
371  if (!is_from)
372  ereport(ERROR,
373  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374  errmsg("cannot use \"%s\" with HEADER in COPY TO",
375  sval)));
376  return COPY_HEADER_MATCH;
377  }
378  }
379  break;
380  }
381  ereport(ERROR,
382  (errcode(ERRCODE_SYNTAX_ERROR),
383  errmsg("%s requires a Boolean value or \"match\"",
384  def->defname)));
385  return COPY_HEADER_FALSE; /* keep compiler quiet */
386 }
387 
388 /*
389  * Process the statement option list for COPY.
390  *
391  * Scan the options list (a list of DefElem) and transpose the information
392  * into *opts_out, applying appropriate error checking.
393  *
394  * If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially.
395  *
396  * This is exported so that external users of the COPY API can sanity-check
397  * a list of options. In that usage, 'opts_out' can be passed as NULL and
398  * the collected data is just leaked until CurrentMemoryContext is reset.
399  *
400  * Note that additional checking, such as whether column names listed in FORCE
401  * QUOTE actually exist, has to be applied later. This just checks for
402  * self-consistency of the options list.
403  */
404 void
406  CopyFormatOptions *opts_out,
407  bool is_from,
408  List *options)
409 {
410  bool format_specified = false;
411  bool freeze_specified = false;
412  bool header_specified = false;
413  ListCell *option;
414 
415  /* Support external use for option sanity checking */
416  if (opts_out == NULL)
417  opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions));
418 
419  opts_out->file_encoding = -1;
420 
421  /* Extract options from the statement node tree */
422  foreach(option, options)
423  {
424  DefElem *defel = lfirst_node(DefElem, option);
425 
426  if (strcmp(defel->defname, "format") == 0)
427  {
428  char *fmt = defGetString(defel);
429 
430  if (format_specified)
431  errorConflictingDefElem(defel, pstate);
432  format_specified = true;
433  if (strcmp(fmt, "text") == 0)
434  /* default format */ ;
435  else if (strcmp(fmt, "csv") == 0)
436  opts_out->csv_mode = true;
437  else if (strcmp(fmt, "binary") == 0)
438  opts_out->binary = true;
439  else
440  ereport(ERROR,
441  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
442  errmsg("COPY format \"%s\" not recognized", fmt),
443  parser_errposition(pstate, defel->location)));
444  }
445  else if (strcmp(defel->defname, "freeze") == 0)
446  {
447  if (freeze_specified)
448  errorConflictingDefElem(defel, pstate);
449  freeze_specified = true;
450  opts_out->freeze = defGetBoolean(defel);
451  }
452  else if (strcmp(defel->defname, "delimiter") == 0)
453  {
454  if (opts_out->delim)
455  errorConflictingDefElem(defel, pstate);
456  opts_out->delim = defGetString(defel);
457  }
458  else if (strcmp(defel->defname, "null") == 0)
459  {
460  if (opts_out->null_print)
461  errorConflictingDefElem(defel, pstate);
462  opts_out->null_print = defGetString(defel);
463  }
464  else if (strcmp(defel->defname, "header") == 0)
465  {
466  if (header_specified)
467  errorConflictingDefElem(defel, pstate);
468  header_specified = true;
469  opts_out->header_line = defGetCopyHeaderChoice(defel, is_from);
470  }
471  else if (strcmp(defel->defname, "quote") == 0)
472  {
473  if (opts_out->quote)
474  errorConflictingDefElem(defel, pstate);
475  opts_out->quote = defGetString(defel);
476  }
477  else if (strcmp(defel->defname, "escape") == 0)
478  {
479  if (opts_out->escape)
480  errorConflictingDefElem(defel, pstate);
481  opts_out->escape = defGetString(defel);
482  }
483  else if (strcmp(defel->defname, "force_quote") == 0)
484  {
485  if (opts_out->force_quote || opts_out->force_quote_all)
486  errorConflictingDefElem(defel, pstate);
487  if (defel->arg && IsA(defel->arg, A_Star))
488  opts_out->force_quote_all = true;
489  else if (defel->arg && IsA(defel->arg, List))
490  opts_out->force_quote = castNode(List, defel->arg);
491  else
492  ereport(ERROR,
493  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
494  errmsg("argument to option \"%s\" must be a list of column names",
495  defel->defname),
496  parser_errposition(pstate, defel->location)));
497  }
498  else if (strcmp(defel->defname, "force_not_null") == 0)
499  {
500  if (opts_out->force_notnull)
501  errorConflictingDefElem(defel, pstate);
502  if (defel->arg && IsA(defel->arg, List))
503  opts_out->force_notnull = castNode(List, defel->arg);
504  else
505  ereport(ERROR,
506  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
507  errmsg("argument to option \"%s\" must be a list of column names",
508  defel->defname),
509  parser_errposition(pstate, defel->location)));
510  }
511  else if (strcmp(defel->defname, "force_null") == 0)
512  {
513  if (opts_out->force_null)
514  errorConflictingDefElem(defel, pstate);
515  if (defel->arg && IsA(defel->arg, List))
516  opts_out->force_null = castNode(List, defel->arg);
517  else
518  ereport(ERROR,
519  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
520  errmsg("argument to option \"%s\" must be a list of column names",
521  defel->defname),
522  parser_errposition(pstate, defel->location)));
523  }
524  else if (strcmp(defel->defname, "convert_selectively") == 0)
525  {
526  /*
527  * Undocumented, not-accessible-from-SQL option: convert only the
528  * named columns to binary form, storing the rest as NULLs. It's
529  * allowed for the column list to be NIL.
530  */
531  if (opts_out->convert_selectively)
532  errorConflictingDefElem(defel, pstate);
533  opts_out->convert_selectively = true;
534  if (defel->arg == NULL || IsA(defel->arg, List))
535  opts_out->convert_select = castNode(List, defel->arg);
536  else
537  ereport(ERROR,
538  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
539  errmsg("argument to option \"%s\" must be a list of column names",
540  defel->defname),
541  parser_errposition(pstate, defel->location)));
542  }
543  else if (strcmp(defel->defname, "encoding") == 0)
544  {
545  if (opts_out->file_encoding >= 0)
546  errorConflictingDefElem(defel, pstate);
547  opts_out->file_encoding = pg_char_to_encoding(defGetString(defel));
548  if (opts_out->file_encoding < 0)
549  ereport(ERROR,
550  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
551  errmsg("argument to option \"%s\" must be a valid encoding name",
552  defel->defname),
553  parser_errposition(pstate, defel->location)));
554  }
555  else
556  ereport(ERROR,
557  (errcode(ERRCODE_SYNTAX_ERROR),
558  errmsg("option \"%s\" not recognized",
559  defel->defname),
560  parser_errposition(pstate, defel->location)));
561  }
562 
563  /*
564  * Check for incompatible options (must do these two before inserting
565  * defaults)
566  */
567  if (opts_out->binary && opts_out->delim)
568  ereport(ERROR,
569  (errcode(ERRCODE_SYNTAX_ERROR),
570  errmsg("cannot specify DELIMITER in BINARY mode")));
571 
572  if (opts_out->binary && opts_out->null_print)
573  ereport(ERROR,
574  (errcode(ERRCODE_SYNTAX_ERROR),
575  errmsg("cannot specify NULL in BINARY mode")));
576 
577  /* Set defaults for omitted options */
578  if (!opts_out->delim)
579  opts_out->delim = opts_out->csv_mode ? "," : "\t";
580 
581  if (!opts_out->null_print)
582  opts_out->null_print = opts_out->csv_mode ? "" : "\\N";
583  opts_out->null_print_len = strlen(opts_out->null_print);
584 
585  if (opts_out->csv_mode)
586  {
587  if (!opts_out->quote)
588  opts_out->quote = "\"";
589  if (!opts_out->escape)
590  opts_out->escape = opts_out->quote;
591  }
592 
593  /* Only single-byte delimiter strings are supported. */
594  if (strlen(opts_out->delim) != 1)
595  ereport(ERROR,
596  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
597  errmsg("COPY delimiter must be a single one-byte character")));
598 
599  /* Disallow end-of-line characters */
600  if (strchr(opts_out->delim, '\r') != NULL ||
601  strchr(opts_out->delim, '\n') != NULL)
602  ereport(ERROR,
603  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
604  errmsg("COPY delimiter cannot be newline or carriage return")));
605 
606  if (strchr(opts_out->null_print, '\r') != NULL ||
607  strchr(opts_out->null_print, '\n') != NULL)
608  ereport(ERROR,
609  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
610  errmsg("COPY null representation cannot use newline or carriage return")));
611 
612  /*
613  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
614  * backslash because it would be ambiguous. We can't allow the other
615  * cases because data characters matching the delimiter must be
616  * backslashed, and certain backslash combinations are interpreted
617  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
618  * more than strictly necessary, but seems best for consistency and
619  * future-proofing. Likewise we disallow all digits though only octal
620  * digits are actually dangerous.
621  */
622  if (!opts_out->csv_mode &&
623  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
624  opts_out->delim[0]) != NULL)
625  ereport(ERROR,
626  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627  errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim)));
628 
629  /* Check header */
630  if (opts_out->binary && opts_out->header_line)
631  ereport(ERROR,
632  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
633  errmsg("cannot specify HEADER in BINARY mode")));
634 
635  /* Check quote */
636  if (!opts_out->csv_mode && opts_out->quote != NULL)
637  ereport(ERROR,
638  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
639  errmsg("COPY quote available only in CSV mode")));
640 
641  if (opts_out->csv_mode && strlen(opts_out->quote) != 1)
642  ereport(ERROR,
643  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
644  errmsg("COPY quote must be a single one-byte character")));
645 
646  if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0])
647  ereport(ERROR,
648  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
649  errmsg("COPY delimiter and quote must be different")));
650 
651  /* Check escape */
652  if (!opts_out->csv_mode && opts_out->escape != NULL)
653  ereport(ERROR,
654  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
655  errmsg("COPY escape available only in CSV mode")));
656 
657  if (opts_out->csv_mode && strlen(opts_out->escape) != 1)
658  ereport(ERROR,
659  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
660  errmsg("COPY escape must be a single one-byte character")));
661 
662  /* Check force_quote */
663  if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all))
664  ereport(ERROR,
665  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
666  errmsg("COPY force quote available only in CSV mode")));
667  if ((opts_out->force_quote || opts_out->force_quote_all) && is_from)
668  ereport(ERROR,
669  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
670  errmsg("COPY force quote only available using COPY TO")));
671 
672  /* Check force_notnull */
673  if (!opts_out->csv_mode && opts_out->force_notnull != NIL)
674  ereport(ERROR,
675  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
676  errmsg("COPY force not null available only in CSV mode")));
677  if (opts_out->force_notnull != NIL && !is_from)
678  ereport(ERROR,
679  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
680  errmsg("COPY force not null only available using COPY FROM")));
681 
682  /* Check force_null */
683  if (!opts_out->csv_mode && opts_out->force_null != NIL)
684  ereport(ERROR,
685  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
686  errmsg("COPY force null available only in CSV mode")));
687 
688  if (opts_out->force_null != NIL && !is_from)
689  ereport(ERROR,
690  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
691  errmsg("COPY force null only available using COPY FROM")));
692 
693  /* Don't allow the delimiter to appear in the null string. */
694  if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL)
695  ereport(ERROR,
696  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
697  errmsg("COPY delimiter must not appear in the NULL specification")));
698 
699  /* Don't allow the CSV quote char to appear in the null string. */
700  if (opts_out->csv_mode &&
701  strchr(opts_out->null_print, opts_out->quote[0]) != NULL)
702  ereport(ERROR,
703  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
704  errmsg("CSV quote character must not appear in the NULL specification")));
705 }
706 
707 /*
708  * CopyGetAttnums - build an integer list of attnums to be copied
709  *
710  * The input attnamelist is either the user-specified column list,
711  * or NIL if there was none (in which case we want all the non-dropped
712  * columns).
713  *
714  * We don't include generated columns in the generated full list and we don't
715  * allow them to be specified explicitly. They don't make sense for COPY
716  * FROM, but we could possibly allow them for COPY TO. But this way it's at
717  * least ensured that whatever we copy out can be copied back in.
718  *
719  * rel can be NULL ... it's only used for error reports.
720  */
721 List *
722 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
723 {
724  List *attnums = NIL;
725 
726  if (attnamelist == NIL)
727  {
728  /* Generate default column list */
729  int attr_count = tupDesc->natts;
730  int i;
731 
732  for (i = 0; i < attr_count; i++)
733  {
734  if (TupleDescAttr(tupDesc, i)->attisdropped)
735  continue;
736  if (TupleDescAttr(tupDesc, i)->attgenerated)
737  continue;
738  attnums = lappend_int(attnums, i + 1);
739  }
740  }
741  else
742  {
743  /* Validate the user-supplied list and extract attnums */
744  ListCell *l;
745 
746  foreach(l, attnamelist)
747  {
748  char *name = strVal(lfirst(l));
749  int attnum;
750  int i;
751 
752  /* Lookup column name */
754  for (i = 0; i < tupDesc->natts; i++)
755  {
756  Form_pg_attribute att = TupleDescAttr(tupDesc, i);
757 
758  if (att->attisdropped)
759  continue;
760  if (namestrcmp(&(att->attname), name) == 0)
761  {
762  if (att->attgenerated)
763  ereport(ERROR,
764  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
765  errmsg("column \"%s\" is a generated column",
766  name),
767  errdetail("Generated columns cannot be used in COPY.")));
768  attnum = att->attnum;
769  break;
770  }
771  }
772  if (attnum == InvalidAttrNumber)
773  {
774  if (rel != NULL)
775  ereport(ERROR,
776  (errcode(ERRCODE_UNDEFINED_COLUMN),
777  errmsg("column \"%s\" of relation \"%s\" does not exist",
778  name, RelationGetRelationName(rel))));
779  else
780  ereport(ERROR,
781  (errcode(ERRCODE_UNDEFINED_COLUMN),
782  errmsg("column \"%s\" does not exist",
783  name)));
784  }
785  /* Check for duplicates */
786  if (list_member_int(attnums, attnum))
787  ereport(ERROR,
788  (errcode(ERRCODE_DUPLICATE_COLUMN),
789  errmsg("column \"%s\" specified more than once",
790  name)));
791  attnums = lappend_int(attnums, attnum);
792  }
793  }
794 
795  return attnums;
796 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4969
#define InvalidAttrNumber
Definition: attnum.h:23
static CopyHeaderChoice defGetCopyHeaderChoice(DefElem *def, bool is_from)
Definition: copy.c:328
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:722
void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed)
Definition: copy.c:64
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:405
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:739
Node * eval_const_expressions(PlannerInfo *root, Node *node)
Definition: clauses.c:2134
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1339
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:632
void EndCopyFrom(CopyFromState cstate)
Definition: copyfrom.c:1716
uint64 DoCopyTo(CopyToState cstate)
Definition: copyto.c:749
CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query, Oid queryRelId, const char *filename, bool is_program, copy_data_dest_cb data_dest_cb, List *attnamelist, List *options)
Definition: copyto.c:357
void EndCopyTo(CopyToState cstate)
Definition: copyto.c:728
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:385
struct cursor * cur
Definition: ecpg.c:28
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int pg_char_to_encoding(const char *name)
Definition: encnames.c:550
const char * name
Definition: encode.c:571
bool ExecCheckPermissions(List *rangeTable, List *rteperminfos, bool ereport_on_violation)
Definition: execMain.c:574
CopyHeaderChoice
Definition: copy.h:27
@ COPY_HEADER_TRUE
Definition: copy.h:29
@ COPY_HEADER_FALSE
Definition: copy.h:28
@ COPY_HEADER_MATCH
Definition: copy.h:30
int i
Definition: isn.c:73
static void const char * fmt
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:338
List * lappend_int(List *list, int datum)
Definition: list.c:356
bool list_member_int(const List *list, int datum)
Definition: list.c:701
int LOCKMODE
Definition: lockdefs.h:26
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
List * make_ands_implicit(Expr *clause)
Definition: makefuncs.c:721
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:424
char * pstrdup(const char *in)
Definition: mcxt.c:1624
void * palloc0(Size size)
Definition: mcxt.c:1241
Oid GetUserId(void)
Definition: miscinit.c:502
int namestrcmp(Name name, const char *str)
Definition: name.c:247
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
#define nodeTag(nodeptr)
Definition: nodes.h:133
#define makeNode(_type_)
Definition: nodes.h:176
#define castNode(_type_, nodeptr)
Definition: nodes.h:197
Node * coerce_to_boolean(ParseState *pstate, Node *node, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
Node * transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind)
Definition: parse_expr.c:92
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:110
@ EXPR_KIND_COPY_WHERE
Definition: parse_node.h:81
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
#define ACL_INSERT
Definition: parsenodes.h:83
#define ACL_SELECT
Definition: parsenodes.h:84
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
#define NIL
Definition: pg_list.h:68
#define lfirst_int(lc)
Definition: pg_list.h:173
#define list_make1(x1)
Definition: pg_list.h:212
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
Expr * canonicalize_qual(Expr *qual, bool is_check)
Definition: prepqual.c:294
#define RelationGetRelid(relation)
Definition: rel.h:501
#define RelationGetDescr(relation)
Definition: rel.h:527
#define RelationGetRelationName(relation)
Definition: rel.h:535
#define RelationGetNamespace(relation)
Definition: rel.h:542
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
int location
Definition: parsenodes.h:293
List * fields
Definition: parsenodes.h:292
bool force_quote_all
Definition: copy.h:54
bool freeze
Definition: copy.h:44
bool binary
Definition: copy.h:43
int null_print_len
Definition: copy.h:48
bool convert_selectively
Definition: copy.h:60
char * quote
Definition: copy.h:51
CopyHeaderChoice header_line
Definition: copy.h:46
List * force_quote
Definition: copy.h:53
char * escape
Definition: copy.h:52
char * null_print
Definition: copy.h:47
List * force_null
Definition: copy.h:58
char * delim
Definition: copy.h:50
List * convert_select
Definition: copy.h:61
bool csv_mode
Definition: copy.h:45
int file_encoding
Definition: copy.h:41
List * force_notnull
Definition: copy.h:56
bool is_program
Definition: parsenodes.h:2305
RangeVar * relation
Definition: parsenodes.h:2299
List * options
Definition: parsenodes.h:2307
bool is_from
Definition: parsenodes.h:2304
char * filename
Definition: parsenodes.h:2306
List * attlist
Definition: parsenodes.h:2302
Node * whereClause
Definition: parsenodes.h:2308
Node * query
Definition: parsenodes.h:2300
char * defname
Definition: parsenodes.h:810
int location
Definition: parsenodes.h:814
Node * arg
Definition: parsenodes.h:811
Definition: pg_list.h:54
Definition: nodes.h:129
RTEPermissionInfo * p_perminfo
Definition: parse_node.h:288
List * p_rtable
Definition: parse_node.h:193
Bitmapset * selectedCols
Definition: parsenodes.h:1247
AclMode requiredPerms
Definition: parsenodes.h:1245
Bitmapset * insertedCols
Definition: parsenodes.h:1248
int stmt_len
Definition: parsenodes.h:1736
Node * stmt
Definition: parsenodes.h:1734
int stmt_location
Definition: parsenodes.h:1735
bool rd_islocaltemp
Definition: rel.h:60
int location
Definition: parsenodes.h:518
Node * val
Definition: parsenodes.h:517
List * indirection
Definition: parsenodes.h:516
char * name
Definition: parsenodes.h:515
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void PreventCommandIfReadOnly(const char *cmdname)
Definition: utility.c:411
#define intVal(v)
Definition: value.h:79
#define strVal(v)
Definition: value.h:82
#define select(n, r, w, e, timeout)
Definition: win32_port.h:492
bool XactReadOnly
Definition: xact.c:82