PostgreSQL Source Code  git master
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/tableam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/pg_authid.h"
29 #include "catalog/pg_type.h"
30 #include "commands/copy.h"
31 #include "commands/defrem.h"
32 #include "commands/trigger.h"
33 #include "executor/execPartition.h"
34 #include "executor/executor.h"
36 #include "executor/tuptable.h"
37 #include "foreign/fdwapi.h"
38 #include "libpq/libpq.h"
39 #include "libpq/pqformat.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "nodes/makefuncs.h"
43 #include "optimizer/optimizer.h"
44 #include "parser/parse_coerce.h"
45 #include "parser/parse_collate.h"
46 #include "parser/parse_expr.h"
47 #include "parser/parse_relation.h"
48 #include "port/pg_bswap.h"
49 #include "rewrite/rewriteHandler.h"
50 #include "storage/fd.h"
51 #include "tcop/tcopprot.h"
52 #include "utils/acl.h"
53 #include "utils/builtins.h"
54 #include "utils/lsyscache.h"
55 #include "utils/memutils.h"
56 #include "utils/partcache.h"
57 #include "utils/portal.h"
58 #include "utils/rel.h"
59 #include "utils/rls.h"
60 #include "utils/snapmgr.h"
61 
62 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
63 #define OCTVALUE(c) ((c) - '0')
64 
65 /*
66  * Represents the different source/dest cases we need to worry about at
67  * the bottom level
68  */
69 typedef enum CopyDest
70 {
71  COPY_FILE, /* to/from file (or a piped program) */
72  COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
73  COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
74  COPY_CALLBACK /* to/from callback function */
75 } CopyDest;
76 
77 /*
78  * Represents the end-of-line terminator type of the input
79  */
80 typedef enum EolType
81 {
86 } EolType;
87 
88 /*
89  * Represents the heap insert method to be used during COPY FROM.
90  */
91 typedef enum CopyInsertMethod
92 {
93  CIM_SINGLE, /* use table_tuple_insert or fdw routine */
94  CIM_MULTI, /* always use table_multi_insert */
95  CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
97 
98 /*
99  * This struct contains all the state variables used throughout a COPY
100  * operation. For simplicity, we use the same struct for all variants of COPY,
101  * even though some fields are used in only some cases.
102  *
103  * Multi-byte encodings: all supported client-side encodings encode multi-byte
104  * characters by having the first byte's high bit set. Subsequent bytes of the
105  * character can have the high bit not set. When scanning data in such an
106  * encoding to look for a match to a single-byte (ie ASCII) character, we must
107  * use the full pg_encoding_mblen() machinery to skip over multibyte
108  * characters, else we might find a false match to a trailing byte. In
109  * supported server encodings, there is no possibility of a false match, and
110  * it's faster to make useless comparisons to trailing bytes than it is to
111  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
112  * when we have to do it the hard way.
113  */
114 typedef struct CopyStateData
115 {
116  /* low-level state data */
117  CopyDest copy_dest; /* type of copy source/destination */
118  FILE *copy_file; /* used if copy_dest == COPY_FILE */
119  StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
120  * dest == COPY_NEW_FE in COPY FROM */
121  bool is_copy_from; /* COPY TO, or COPY FROM? */
122  bool reached_eof; /* true if we read to end of copy data (not
123  * all copy_dest types maintain this) */
124  EolType eol_type; /* EOL type of input */
125  int file_encoding; /* file or remote side's character encoding */
126  bool need_transcoding; /* file encoding diff from server? */
127  bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
128 
129  /* parameters from the COPY command */
130  Relation rel; /* relation to copy to or from */
131  QueryDesc *queryDesc; /* executable query to copy from */
132  List *attnumlist; /* integer list of attnums to copy */
133  char *filename; /* filename, or NULL for STDIN/STDOUT */
134  bool is_program; /* is 'filename' a program to popen? */
135  copy_data_source_cb data_source_cb; /* function for reading data */
136  bool binary; /* binary format? */
137  bool freeze; /* freeze rows on loading? */
138  bool csv_mode; /* Comma Separated Value format? */
139  bool header_line; /* CSV header line? */
140  char *null_print; /* NULL marker string (server encoding!) */
141  int null_print_len; /* length of same */
142  char *null_print_client; /* same converted to file encoding */
143  char *delim; /* column delimiter (must be 1 byte) */
144  char *quote; /* CSV quote char (must be 1 byte) */
145  char *escape; /* CSV escape char (must be 1 byte) */
146  List *force_quote; /* list of column names */
147  bool force_quote_all; /* FORCE_QUOTE *? */
148  bool *force_quote_flags; /* per-column CSV FQ flags */
149  List *force_notnull; /* list of column names */
150  bool *force_notnull_flags; /* per-column CSV FNN flags */
151  List *force_null; /* list of column names */
152  bool *force_null_flags; /* per-column CSV FN flags */
153  bool convert_selectively; /* do selective binary conversion? */
154  List *convert_select; /* list of column names (can be NIL) */
155  bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
156  Node *whereClause; /* WHERE condition (or NULL) */
157 
158  /* these are just for error messages, see CopyFromErrorCallback */
159  const char *cur_relname; /* table name for error messages */
160  uint64 cur_lineno; /* line number for error messages */
161  const char *cur_attname; /* current att for error messages */
162  const char *cur_attval; /* current att value for error messages */
163 
164  /*
165  * Working state for COPY TO/FROM
166  */
167  MemoryContext copycontext; /* per-copy execution context */
168 
169  /*
170  * Working state for COPY TO
171  */
172  FmgrInfo *out_functions; /* lookup info for output functions */
173  MemoryContext rowcontext; /* per-row evaluation context */
174 
175  /*
176  * Working state for COPY FROM
177  */
179  FmgrInfo *in_functions; /* array of input functions for each attrs */
180  Oid *typioparams; /* array of element types for in_functions */
181  int *defmap; /* array of default att numbers */
182  ExprState **defexprs; /* array of default att expressions */
183  bool volatile_defexprs; /* is any of defexprs volatile? */
186 
188 
189  /*
190  * These variables are used to reduce overhead in textual COPY FROM.
191  *
192  * attribute_buf holds the separated, de-escaped text for each field of
193  * the current line. The CopyReadAttributes functions return arrays of
194  * pointers into this buffer. We avoid palloc/pfree overhead by re-using
195  * the buffer on each cycle.
196  */
198 
199  /* field raw data pointers found by COPY FROM */
200 
202  char **raw_fields;
203 
204  /*
205  * Similarly, line_buf holds the whole input line being processed. The
206  * input cycle is first to read the whole line into line_buf, convert it
207  * to server encoding there, and then extract the individual attribute
208  * fields into attribute_buf. line_buf is preserved unmodified so that we
209  * can display it in error messages if appropriate.
210  */
212  bool line_buf_converted; /* converted to server encoding? */
213  bool line_buf_valid; /* contains the row being processed? */
214 
215  /*
216  * Finally, raw_buf holds raw data read from the data source (file or
217  * client connection). CopyReadLine parses this data sufficiently to
218  * locate line boundaries, then transfers the data to line_buf and
219  * converts it. Note: we guarantee that there is a \0 at
220  * raw_buf[raw_buf_len].
221  */
222 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
223  char *raw_buf;
224  int raw_buf_index; /* next byte to process */
225  int raw_buf_len; /* total # of bytes stored */
226 } CopyStateData;
227 
228 /* DestReceiver for COPY (query) TO */
229 typedef struct
230 {
231  DestReceiver pub; /* publicly-known function pointers */
232  CopyState cstate; /* CopyStateData for the command */
233  uint64 processed; /* # of tuples processed */
234 } DR_copy;
235 
236 
237 /*
238  * No more than this many tuples per CopyMultiInsertBuffer
239  *
240  * Caution: Don't make this too big, as we could end up with this many
241  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
242  * multiInsertBuffers list. Increasing this can cause quadratic growth in
243  * memory requirements during copies into partitioned tables with a large
244  * number of partitions.
245  */
246 #define MAX_BUFFERED_TUPLES 1000
247 
248 /*
249  * Flush buffers if there are >= this many bytes, as counted by the input
250  * size, of tuples stored.
251  */
252 #define MAX_BUFFERED_BYTES 65535
253 
254 /* Trim the list of buffers back down to this number after flushing */
255 #define MAX_PARTITION_BUFFERS 32
256 
257 /* Stores multi-insert data related to a single relation in CopyFrom. */
258 typedef struct CopyMultiInsertBuffer
259 {
260  TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
261  ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
262  BulkInsertState bistate; /* BulkInsertState for this rel */
263  int nused; /* number of 'slots' containing tuples */
264  uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
265  * stream */
267 
268 /*
269  * Stores one or many CopyMultiInsertBuffers and details about the size and
270  * number of tuples which are stored in them. This allows multiple buffers to
271  * exist at once when COPYing into a partitioned table.
272  */
273 typedef struct CopyMultiInsertInfo
274 {
275  List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
276  int bufferedTuples; /* number of tuples buffered over all buffers */
277  int bufferedBytes; /* number of bytes from all buffered tuples */
278  CopyState cstate; /* Copy state for this CopyMultiInsertInfo */
279  EState *estate; /* Executor state used for COPY */
280  CommandId mycid; /* Command Id used for COPY */
281  int ti_options; /* table insert options */
283 
284 
285 /*
286  * These macros centralize code used to process line_buf and raw_buf buffers.
287  * They are macros because they often do continue/break control and to avoid
288  * function call overhead in tight COPY loops.
289  *
290  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
291  * prevent the continue/break processing from working. We end the "if (1)"
292  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
293  * any "else" in the calling code, and to avoid any compiler warnings about
294  * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
295  */
296 
297 /*
298  * This keeps the character read at the top of the loop in the buffer
299  * even if there is more than one read-ahead.
300  */
301 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
302 if (1) \
303 { \
304  if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
305  { \
306  raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
307  need_data = true; \
308  continue; \
309  } \
310 } else ((void) 0)
311 
312 /* This consumes the remainder of the buffer and breaks */
313 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
314 if (1) \
315 { \
316  if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
317  { \
318  if (extralen) \
319  raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
320  /* backslash just before EOF, treat as data char */ \
321  result = true; \
322  break; \
323  } \
324 } else ((void) 0)
325 
326 /*
327  * Transfer any approved data to line_buf; must do this to be sure
328  * there is some room in raw_buf.
329  */
330 #define REFILL_LINEBUF \
331 if (1) \
332 { \
333  if (raw_buf_ptr > cstate->raw_buf_index) \
334  { \
335  appendBinaryStringInfo(&cstate->line_buf, \
336  cstate->raw_buf + cstate->raw_buf_index, \
337  raw_buf_ptr - cstate->raw_buf_index); \
338  cstate->raw_buf_index = raw_buf_ptr; \
339  } \
340 } else ((void) 0)
341 
342 /* Undo any read-ahead and jump out of the block. */
343 #define NO_END_OF_COPY_GOTO \
344 if (1) \
345 { \
346  raw_buf_ptr = prev_raw_ptr + 1; \
347  goto not_end_of_copy; \
348 } else ((void) 0)
349 
350 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
351 
352 
353 /* non-export function prototypes */
354 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
355  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
356  List *options);
357 static void EndCopy(CopyState cstate);
358 static void ClosePipeToProgram(CopyState cstate);
359 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
360  Oid queryRelId, const char *filename, bool is_program,
361  List *attnamelist, List *options);
362 static void EndCopyTo(CopyState cstate);
363 static uint64 DoCopyTo(CopyState cstate);
364 static uint64 CopyTo(CopyState cstate);
365 static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
366 static bool CopyReadLine(CopyState cstate);
367 static bool CopyReadLineText(CopyState cstate);
368 static int CopyReadAttributesText(CopyState cstate);
369 static int CopyReadAttributesCSV(CopyState cstate);
370 static Datum CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo,
371  Oid typioparam, int32 typmod,
372  bool *isnull);
373 static void CopyAttributeOutText(CopyState cstate, char *string);
374 static void CopyAttributeOutCSV(CopyState cstate, char *string,
375  bool use_quote, bool single_attr);
376 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
377  List *attnamelist);
378 static char *limit_printout_length(const char *str);
379 
380 /* Low-level communications functions */
381 static void SendCopyBegin(CopyState cstate);
382 static void ReceiveCopyBegin(CopyState cstate);
383 static void SendCopyEnd(CopyState cstate);
384 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
385 static void CopySendString(CopyState cstate, const char *str);
386 static void CopySendChar(CopyState cstate, char c);
387 static void CopySendEndOfRow(CopyState cstate);
388 static int CopyGetData(CopyState cstate, void *databuf,
389  int minread, int maxread);
390 static void CopySendInt32(CopyState cstate, int32 val);
391 static bool CopyGetInt32(CopyState cstate, int32 *val);
392 static void CopySendInt16(CopyState cstate, int16 val);
393 static bool CopyGetInt16(CopyState cstate, int16 *val);
394 
395 
396 /*
397  * Send copy start/stop messages for frontend copies. These have changed
398  * in past protocol redesigns.
399  */
400 static void
402 {
404  {
405  /* new way */
407  int natts = list_length(cstate->attnumlist);
408  int16 format = (cstate->binary ? 1 : 0);
409  int i;
410 
411  pq_beginmessage(&buf, 'H');
412  pq_sendbyte(&buf, format); /* overall format */
413  pq_sendint16(&buf, natts);
414  for (i = 0; i < natts; i++)
415  pq_sendint16(&buf, format); /* per-column formats */
416  pq_endmessage(&buf);
417  cstate->copy_dest = COPY_NEW_FE;
418  }
419  else
420  {
421  /* old way */
422  if (cstate->binary)
423  ereport(ERROR,
424  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425  errmsg("COPY BINARY is not supported to stdout or from stdin")));
426  pq_putemptymessage('H');
427  /* grottiness needed for old COPY OUT protocol */
428  pq_startcopyout();
429  cstate->copy_dest = COPY_OLD_FE;
430  }
431 }
432 
433 static void
435 {
437  {
438  /* new way */
440  int natts = list_length(cstate->attnumlist);
441  int16 format = (cstate->binary ? 1 : 0);
442  int i;
443 
444  pq_beginmessage(&buf, 'G');
445  pq_sendbyte(&buf, format); /* overall format */
446  pq_sendint16(&buf, natts);
447  for (i = 0; i < natts; i++)
448  pq_sendint16(&buf, format); /* per-column formats */
449  pq_endmessage(&buf);
450  cstate->copy_dest = COPY_NEW_FE;
451  cstate->fe_msgbuf = makeStringInfo();
452  }
453  else
454  {
455  /* old way */
456  if (cstate->binary)
457  ereport(ERROR,
458  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
459  errmsg("COPY BINARY is not supported to stdout or from stdin")));
460  pq_putemptymessage('G');
461  /* any error in old protocol will make us lose sync */
462  pq_startmsgread();
463  cstate->copy_dest = COPY_OLD_FE;
464  }
465  /* We *must* flush here to ensure FE knows it can send. */
466  pq_flush();
467 }
468 
469 static void
471 {
472  if (cstate->copy_dest == COPY_NEW_FE)
473  {
474  /* Shouldn't have any unsent data */
475  Assert(cstate->fe_msgbuf->len == 0);
476  /* Send Copy Done message */
477  pq_putemptymessage('c');
478  }
479  else
480  {
481  CopySendData(cstate, "\\.", 2);
482  /* Need to flush out the trailer (this also appends a newline) */
483  CopySendEndOfRow(cstate);
484  pq_endcopyout(false);
485  }
486 }
487 
488 /*----------
489  * CopySendData sends output data to the destination (file or frontend)
490  * CopySendString does the same for null-terminated strings
491  * CopySendChar does the same for single characters
492  * CopySendEndOfRow does the appropriate thing at end of each data row
493  * (data is not actually flushed except by CopySendEndOfRow)
494  *
495  * NB: no data conversion is applied by these functions
496  *----------
497  */
498 static void
499 CopySendData(CopyState cstate, const void *databuf, int datasize)
500 {
501  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
502 }
503 
504 static void
505 CopySendString(CopyState cstate, const char *str)
506 {
507  appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
508 }
509 
510 static void
511 CopySendChar(CopyState cstate, char c)
512 {
514 }
515 
516 static void
518 {
519  StringInfo fe_msgbuf = cstate->fe_msgbuf;
520 
521  switch (cstate->copy_dest)
522  {
523  case COPY_FILE:
524  if (!cstate->binary)
525  {
526  /* Default line termination depends on platform */
527 #ifndef WIN32
528  CopySendChar(cstate, '\n');
529 #else
530  CopySendString(cstate, "\r\n");
531 #endif
532  }
533 
534  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
535  cstate->copy_file) != 1 ||
536  ferror(cstate->copy_file))
537  {
538  if (cstate->is_program)
539  {
540  if (errno == EPIPE)
541  {
542  /*
543  * The pipe will be closed automatically on error at
544  * the end of transaction, but we might get a better
545  * error message from the subprocess' exit code than
546  * just "Broken Pipe"
547  */
548  ClosePipeToProgram(cstate);
549 
550  /*
551  * If ClosePipeToProgram() didn't throw an error, the
552  * program terminated normally, but closed the pipe
553  * first. Restore errno, and throw an error.
554  */
555  errno = EPIPE;
556  }
557  ereport(ERROR,
559  errmsg("could not write to COPY program: %m")));
560  }
561  else
562  ereport(ERROR,
564  errmsg("could not write to COPY file: %m")));
565  }
566  break;
567  case COPY_OLD_FE:
568  /* The FE/BE protocol uses \n as newline for all platforms */
569  if (!cstate->binary)
570  CopySendChar(cstate, '\n');
571 
572  if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
573  {
574  /* no hope of recovering connection sync, so FATAL */
575  ereport(FATAL,
576  (errcode(ERRCODE_CONNECTION_FAILURE),
577  errmsg("connection lost during COPY to stdout")));
578  }
579  break;
580  case COPY_NEW_FE:
581  /* The FE/BE protocol uses \n as newline for all platforms */
582  if (!cstate->binary)
583  CopySendChar(cstate, '\n');
584 
585  /* Dump the accumulated row as one CopyData message */
586  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
587  break;
588  case COPY_CALLBACK:
589  Assert(false); /* Not yet supported. */
590  break;
591  }
592 
593  resetStringInfo(fe_msgbuf);
594 }
595 
596 /*
597  * CopyGetData reads data from the source (file or frontend)
598  *
599  * We attempt to read at least minread, and at most maxread, bytes from
600  * the source. The actual number of bytes read is returned; if this is
601  * less than minread, EOF was detected.
602  *
603  * Note: when copying from the frontend, we expect a proper EOF mark per
604  * protocol; if the frontend simply drops the connection, we raise error.
605  * It seems unwise to allow the COPY IN to complete normally in that case.
606  *
607  * NB: no data conversion is applied here.
608  */
609 static int
610 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
611 {
612  int bytesread = 0;
613 
614  switch (cstate->copy_dest)
615  {
616  case COPY_FILE:
617  bytesread = fread(databuf, 1, maxread, cstate->copy_file);
618  if (ferror(cstate->copy_file))
619  ereport(ERROR,
621  errmsg("could not read from COPY file: %m")));
622  if (bytesread == 0)
623  cstate->reached_eof = true;
624  break;
625  case COPY_OLD_FE:
626 
627  /*
628  * We cannot read more than minread bytes (which in practice is 1)
629  * because old protocol doesn't have any clear way of separating
630  * the COPY stream from following data. This is slow, but not any
631  * slower than the code path was originally, and we don't care
632  * much anymore about the performance of old protocol.
633  */
634  if (pq_getbytes((char *) databuf, minread))
635  {
636  /* Only a \. terminator is legal EOF in old protocol */
637  ereport(ERROR,
638  (errcode(ERRCODE_CONNECTION_FAILURE),
639  errmsg("unexpected EOF on client connection with an open transaction")));
640  }
641  bytesread = minread;
642  break;
643  case COPY_NEW_FE:
644  while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
645  {
646  int avail;
647 
648  while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
649  {
650  /* Try to receive another message */
651  int mtype;
652 
653  readmessage:
655  pq_startmsgread();
656  mtype = pq_getbyte();
657  if (mtype == EOF)
658  ereport(ERROR,
659  (errcode(ERRCODE_CONNECTION_FAILURE),
660  errmsg("unexpected EOF on client connection with an open transaction")));
661  if (pq_getmessage(cstate->fe_msgbuf, 0))
662  ereport(ERROR,
663  (errcode(ERRCODE_CONNECTION_FAILURE),
664  errmsg("unexpected EOF on client connection with an open transaction")));
666  switch (mtype)
667  {
668  case 'd': /* CopyData */
669  break;
670  case 'c': /* CopyDone */
671  /* COPY IN correctly terminated by frontend */
672  cstate->reached_eof = true;
673  return bytesread;
674  case 'f': /* CopyFail */
675  ereport(ERROR,
676  (errcode(ERRCODE_QUERY_CANCELED),
677  errmsg("COPY from stdin failed: %s",
678  pq_getmsgstring(cstate->fe_msgbuf))));
679  break;
680  case 'H': /* Flush */
681  case 'S': /* Sync */
682 
683  /*
684  * Ignore Flush/Sync for the convenience of client
685  * libraries (such as libpq) that may send those
686  * without noticing that the command they just
687  * sent was COPY.
688  */
689  goto readmessage;
690  default:
691  ereport(ERROR,
692  (errcode(ERRCODE_PROTOCOL_VIOLATION),
693  errmsg("unexpected message type 0x%02X during COPY from stdin",
694  mtype)));
695  break;
696  }
697  }
698  avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
699  if (avail > maxread)
700  avail = maxread;
701  pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
702  databuf = (void *) ((char *) databuf + avail);
703  maxread -= avail;
704  bytesread += avail;
705  }
706  break;
707  case COPY_CALLBACK:
708  bytesread = cstate->data_source_cb(databuf, minread, maxread);
709  break;
710  }
711 
712  return bytesread;
713 }
714 
715 
716 /*
717  * These functions do apply some data conversion
718  */
719 
720 /*
721  * CopySendInt32 sends an int32 in network byte order
722  */
723 static void
725 {
726  uint32 buf;
727 
728  buf = pg_hton32((uint32) val);
729  CopySendData(cstate, &buf, sizeof(buf));
730 }
731 
732 /*
733  * CopyGetInt32 reads an int32 that appears in network byte order
734  *
735  * Returns true if OK, false if EOF
736  */
737 static bool
739 {
740  uint32 buf;
741 
742  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
743  {
744  *val = 0; /* suppress compiler warning */
745  return false;
746  }
747  *val = (int32) pg_ntoh32(buf);
748  return true;
749 }
750 
751 /*
752  * CopySendInt16 sends an int16 in network byte order
753  */
754 static void
756 {
757  uint16 buf;
758 
759  buf = pg_hton16((uint16) val);
760  CopySendData(cstate, &buf, sizeof(buf));
761 }
762 
763 /*
764  * CopyGetInt16 reads an int16 that appears in network byte order
765  */
766 static bool
768 {
769  uint16 buf;
770 
771  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
772  {
773  *val = 0; /* suppress compiler warning */
774  return false;
775  }
776  *val = (int16) pg_ntoh16(buf);
777  return true;
778 }
779 
780 
781 /*
782  * CopyLoadRawBuf loads some more data into raw_buf
783  *
784  * Returns true if able to obtain at least one more byte, else false.
785  *
786  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
787  * down to the start of the buffer and then we load more data after that.
788  * This case is used only when a frontend multibyte character crosses a
789  * bufferload boundary.
790  */
791 static bool
793 {
794  int nbytes;
795  int inbytes;
796 
797  if (cstate->raw_buf_index < cstate->raw_buf_len)
798  {
799  /* Copy down the unprocessed data */
800  nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
801  memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
802  nbytes);
803  }
804  else
805  nbytes = 0; /* no data need be saved */
806 
807  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
808  1, RAW_BUF_SIZE - nbytes);
809  nbytes += inbytes;
810  cstate->raw_buf[nbytes] = '\0';
811  cstate->raw_buf_index = 0;
812  cstate->raw_buf_len = nbytes;
813  return (inbytes > 0);
814 }
815 
816 
817 /*
818  * DoCopy executes the SQL COPY statement
819  *
820  * Either unload or reload contents of table <relation>, depending on <from>.
821  * (<from> = true means we are inserting into the table.) In the "TO" case
822  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
823  * or DELETE query.
824  *
825  * If <pipe> is false, transfer is between the table and the file named
826  * <filename>. Otherwise, transfer is between the table and our regular
827  * input/output stream. The latter could be either stdin/stdout or a
828  * socket, depending on whether we're running under Postmaster control.
829  *
830  * Do not allow a Postgres user without the 'pg_read_server_files' or
831  * 'pg_write_server_files' role to read from or write to a file.
832  *
833  * Do not allow the copy if user doesn't have proper permission to access
834  * the table or the specifically requested columns.
835  */
836 void
837 DoCopy(ParseState *pstate, const CopyStmt *stmt,
838  int stmt_location, int stmt_len,
839  uint64 *processed)
840 {
841  CopyState cstate;
842  bool is_from = stmt->is_from;
843  bool pipe = (stmt->filename == NULL);
844  Relation rel;
845  Oid relid;
846  RawStmt *query = NULL;
847  Node *whereClause = NULL;
848 
849  /*
850  * Disallow COPY to/from file or program except to users with the
851  * appropriate role.
852  */
853  if (!pipe)
854  {
855  if (stmt->is_program)
856  {
857  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
858  ereport(ERROR,
859  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
860  errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
861  errhint("Anyone can COPY to stdout or from stdin. "
862  "psql's \\copy command also works for anyone.")));
863  }
864  else
865  {
866  if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
867  ereport(ERROR,
868  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
869  errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
870  errhint("Anyone can COPY to stdout or from stdin. "
871  "psql's \\copy command also works for anyone.")));
872 
873  if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES))
874  ereport(ERROR,
875  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
876  errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
877  errhint("Anyone can COPY to stdout or from stdin. "
878  "psql's \\copy command also works for anyone.")));
879  }
880  }
881 
882  if (stmt->relation)
883  {
884  LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
885  ParseNamespaceItem *nsitem;
886  RangeTblEntry *rte;
887  TupleDesc tupDesc;
888  List *attnums;
889  ListCell *cur;
890 
891  Assert(!stmt->query);
892 
893  /* Open and lock the relation, using the appropriate lock type. */
894  rel = table_openrv(stmt->relation, lockmode);
895 
896  relid = RelationGetRelid(rel);
897 
898  nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
899  NULL, false, false);
900  rte = nsitem->p_rte;
901  rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
902 
903  if (stmt->whereClause)
904  {
905  /* add nsitem to query namespace */
906  addNSItemToQuery(pstate, nsitem, false, true, true);
907 
908  /* Transform the raw expression tree */
909  whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
910 
911  /* Make sure it yields a boolean result. */
912  whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
913 
914  /* we have to fix its collations too */
915  assign_expr_collations(pstate, whereClause);
916 
917  whereClause = eval_const_expressions(NULL, whereClause);
918 
919  whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
920  whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
921  }
922 
923  tupDesc = RelationGetDescr(rel);
924  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
925  foreach(cur, attnums)
926  {
927  int attno = lfirst_int(cur) -
929 
930  if (is_from)
931  rte->insertedCols = bms_add_member(rte->insertedCols, attno);
932  else
933  rte->selectedCols = bms_add_member(rte->selectedCols, attno);
934  }
935  ExecCheckRTPerms(pstate->p_rtable, true);
936 
937  /*
938  * Permission check for row security policies.
939  *
940  * check_enable_rls will ereport(ERROR) if the user has requested
941  * something invalid and will otherwise indicate if we should enable
942  * RLS (returns RLS_ENABLED) or not for this COPY statement.
943  *
944  * If the relation has a row security policy and we are to apply it
945  * then perform a "query" copy and allow the normal query processing
946  * to handle the policies.
947  *
948  * If RLS is not enabled for this, then just fall through to the
949  * normal non-filtering relation handling.
950  */
951  if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
952  {
954  ColumnRef *cr;
955  ResTarget *target;
956  RangeVar *from;
957  List *targetList = NIL;
958 
959  if (is_from)
960  ereport(ERROR,
961  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962  errmsg("COPY FROM not supported with row-level security"),
963  errhint("Use INSERT statements instead.")));
964 
965  /*
966  * Build target list
967  *
968  * If no columns are specified in the attribute list of the COPY
969  * command, then the target list is 'all' columns. Therefore, '*'
970  * should be used as the target list for the resulting SELECT
971  * statement.
972  *
973  * In the case that columns are specified in the attribute list,
974  * create a ColumnRef and ResTarget for each column and add them
975  * to the target list for the resulting SELECT statement.
976  */
977  if (!stmt->attlist)
978  {
979  cr = makeNode(ColumnRef);
981  cr->location = -1;
982 
983  target = makeNode(ResTarget);
984  target->name = NULL;
985  target->indirection = NIL;
986  target->val = (Node *) cr;
987  target->location = -1;
988 
989  targetList = list_make1(target);
990  }
991  else
992  {
993  ListCell *lc;
994 
995  foreach(lc, stmt->attlist)
996  {
997  /*
998  * Build the ColumnRef for each column. The ColumnRef
999  * 'fields' property is a String 'Value' node (see
1000  * nodes/value.h) that corresponds to the column name
1001  * respectively.
1002  */
1003  cr = makeNode(ColumnRef);
1004  cr->fields = list_make1(lfirst(lc));
1005  cr->location = -1;
1006 
1007  /* Build the ResTarget and add the ColumnRef to it. */
1008  target = makeNode(ResTarget);
1009  target->name = NULL;
1010  target->indirection = NIL;
1011  target->val = (Node *) cr;
1012  target->location = -1;
1013 
1014  /* Add each column to the SELECT statement's target list */
1015  targetList = lappend(targetList, target);
1016  }
1017  }
1018 
1019  /*
1020  * Build RangeVar for from clause, fully qualified based on the
1021  * relation which we have opened and locked.
1022  */
1025  -1);
1026 
1027  /* Build query */
1028  select = makeNode(SelectStmt);
1029  select->targetList = targetList;
1030  select->fromClause = list_make1(from);
1031 
1032  query = makeNode(RawStmt);
1033  query->stmt = (Node *) select;
1034  query->stmt_location = stmt_location;
1035  query->stmt_len = stmt_len;
1036 
1037  /*
1038  * Close the relation for now, but keep the lock on it to prevent
1039  * changes between now and when we start the query-based COPY.
1040  *
1041  * We'll reopen it later as part of the query-based COPY.
1042  */
1043  table_close(rel, NoLock);
1044  rel = NULL;
1045  }
1046  }
1047  else
1048  {
1049  Assert(stmt->query);
1050 
1051  query = makeNode(RawStmt);
1052  query->stmt = stmt->query;
1053  query->stmt_location = stmt_location;
1054  query->stmt_len = stmt_len;
1055 
1056  relid = InvalidOid;
1057  rel = NULL;
1058  }
1059 
1060  if (is_from)
1061  {
1062  Assert(rel);
1063 
1064  /* check read-only transaction and parallel mode */
1065  if (XactReadOnly && !rel->rd_islocaltemp)
1066  PreventCommandIfReadOnly("COPY FROM");
1067 
1068  cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
1069  NULL, stmt->attlist, stmt->options);
1070  cstate->whereClause = whereClause;
1071  *processed = CopyFrom(cstate); /* copy from file to database */
1072  EndCopyFrom(cstate);
1073  }
1074  else
1075  {
1076  cstate = BeginCopyTo(pstate, rel, query, relid,
1077  stmt->filename, stmt->is_program,
1078  stmt->attlist, stmt->options);
1079  *processed = DoCopyTo(cstate); /* copy from database to file */
1080  EndCopyTo(cstate);
1081  }
1082 
1083  if (rel != NULL)
1084  table_close(rel, NoLock);
1085 }
1086 
1087 /*
1088  * Process the statement option list for COPY.
1089  *
1090  * Scan the options list (a list of DefElem) and transpose the information
1091  * into cstate, applying appropriate error checking.
1092  *
1093  * cstate is assumed to be filled with zeroes initially.
1094  *
1095  * This is exported so that external users of the COPY API can sanity-check
1096  * a list of options. In that usage, cstate should be passed as NULL
1097  * (since external users don't know sizeof(CopyStateData)) and the collected
1098  * data is just leaked until CurrentMemoryContext is reset.
1099  *
1100  * Note that additional checking, such as whether column names listed in FORCE
1101  * QUOTE actually exist, has to be applied later. This just checks for
1102  * self-consistency of the options list.
1103  */
1104 void
1106  CopyState cstate,
1107  bool is_from,
1108  List *options)
1109 {
1110  bool format_specified = false;
1111  ListCell *option;
1112 
1113  /* Support external use for option sanity checking */
1114  if (cstate == NULL)
1115  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1116 
1117  cstate->is_copy_from = is_from;
1118 
1119  cstate->file_encoding = -1;
1120 
1121  /* Extract options from the statement node tree */
1122  foreach(option, options)
1123  {
1124  DefElem *defel = lfirst_node(DefElem, option);
1125 
1126  if (strcmp(defel->defname, "format") == 0)
1127  {
1128  char *fmt = defGetString(defel);
1129 
1130  if (format_specified)
1131  ereport(ERROR,
1132  (errcode(ERRCODE_SYNTAX_ERROR),
1133  errmsg("conflicting or redundant options"),
1134  parser_errposition(pstate, defel->location)));
1135  format_specified = true;
1136  if (strcmp(fmt, "text") == 0)
1137  /* default format */ ;
1138  else if (strcmp(fmt, "csv") == 0)
1139  cstate->csv_mode = true;
1140  else if (strcmp(fmt, "binary") == 0)
1141  cstate->binary = true;
1142  else
1143  ereport(ERROR,
1144  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1145  errmsg("COPY format \"%s\" not recognized", fmt),
1146  parser_errposition(pstate, defel->location)));
1147  }
1148  else if (strcmp(defel->defname, "freeze") == 0)
1149  {
1150  if (cstate->freeze)
1151  ereport(ERROR,
1152  (errcode(ERRCODE_SYNTAX_ERROR),
1153  errmsg("conflicting or redundant options"),
1154  parser_errposition(pstate, defel->location)));
1155  cstate->freeze = defGetBoolean(defel);
1156  }
1157  else if (strcmp(defel->defname, "delimiter") == 0)
1158  {
1159  if (cstate->delim)
1160  ereport(ERROR,
1161  (errcode(ERRCODE_SYNTAX_ERROR),
1162  errmsg("conflicting or redundant options"),
1163  parser_errposition(pstate, defel->location)));
1164  cstate->delim = defGetString(defel);
1165  }
1166  else if (strcmp(defel->defname, "null") == 0)
1167  {
1168  if (cstate->null_print)
1169  ereport(ERROR,
1170  (errcode(ERRCODE_SYNTAX_ERROR),
1171  errmsg("conflicting or redundant options"),
1172  parser_errposition(pstate, defel->location)));
1173  cstate->null_print = defGetString(defel);
1174  }
1175  else if (strcmp(defel->defname, "header") == 0)
1176  {
1177  if (cstate->header_line)
1178  ereport(ERROR,
1179  (errcode(ERRCODE_SYNTAX_ERROR),
1180  errmsg("conflicting or redundant options"),
1181  parser_errposition(pstate, defel->location)));
1182  cstate->header_line = defGetBoolean(defel);
1183  }
1184  else if (strcmp(defel->defname, "quote") == 0)
1185  {
1186  if (cstate->quote)
1187  ereport(ERROR,
1188  (errcode(ERRCODE_SYNTAX_ERROR),
1189  errmsg("conflicting or redundant options"),
1190  parser_errposition(pstate, defel->location)));
1191  cstate->quote = defGetString(defel);
1192  }
1193  else if (strcmp(defel->defname, "escape") == 0)
1194  {
1195  if (cstate->escape)
1196  ereport(ERROR,
1197  (errcode(ERRCODE_SYNTAX_ERROR),
1198  errmsg("conflicting or redundant options"),
1199  parser_errposition(pstate, defel->location)));
1200  cstate->escape = defGetString(defel);
1201  }
1202  else if (strcmp(defel->defname, "force_quote") == 0)
1203  {
1204  if (cstate->force_quote || cstate->force_quote_all)
1205  ereport(ERROR,
1206  (errcode(ERRCODE_SYNTAX_ERROR),
1207  errmsg("conflicting or redundant options"),
1208  parser_errposition(pstate, defel->location)));
1209  if (defel->arg && IsA(defel->arg, A_Star))
1210  cstate->force_quote_all = true;
1211  else if (defel->arg && IsA(defel->arg, List))
1212  cstate->force_quote = castNode(List, defel->arg);
1213  else
1214  ereport(ERROR,
1215  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1216  errmsg("argument to option \"%s\" must be a list of column names",
1217  defel->defname),
1218  parser_errposition(pstate, defel->location)));
1219  }
1220  else if (strcmp(defel->defname, "force_not_null") == 0)
1221  {
1222  if (cstate->force_notnull)
1223  ereport(ERROR,
1224  (errcode(ERRCODE_SYNTAX_ERROR),
1225  errmsg("conflicting or redundant options"),
1226  parser_errposition(pstate, defel->location)));
1227  if (defel->arg && IsA(defel->arg, List))
1228  cstate->force_notnull = castNode(List, defel->arg);
1229  else
1230  ereport(ERROR,
1231  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1232  errmsg("argument to option \"%s\" must be a list of column names",
1233  defel->defname),
1234  parser_errposition(pstate, defel->location)));
1235  }
1236  else if (strcmp(defel->defname, "force_null") == 0)
1237  {
1238  if (cstate->force_null)
1239  ereport(ERROR,
1240  (errcode(ERRCODE_SYNTAX_ERROR),
1241  errmsg("conflicting or redundant options")));
1242  if (defel->arg && IsA(defel->arg, List))
1243  cstate->force_null = castNode(List, defel->arg);
1244  else
1245  ereport(ERROR,
1246  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1247  errmsg("argument to option \"%s\" must be a list of column names",
1248  defel->defname),
1249  parser_errposition(pstate, defel->location)));
1250  }
1251  else if (strcmp(defel->defname, "convert_selectively") == 0)
1252  {
1253  /*
1254  * Undocumented, not-accessible-from-SQL option: convert only the
1255  * named columns to binary form, storing the rest as NULLs. It's
1256  * allowed for the column list to be NIL.
1257  */
1258  if (cstate->convert_selectively)
1259  ereport(ERROR,
1260  (errcode(ERRCODE_SYNTAX_ERROR),
1261  errmsg("conflicting or redundant options"),
1262  parser_errposition(pstate, defel->location)));
1263  cstate->convert_selectively = true;
1264  if (defel->arg == NULL || IsA(defel->arg, List))
1265  cstate->convert_select = castNode(List, defel->arg);
1266  else
1267  ereport(ERROR,
1268  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1269  errmsg("argument to option \"%s\" must be a list of column names",
1270  defel->defname),
1271  parser_errposition(pstate, defel->location)));
1272  }
1273  else if (strcmp(defel->defname, "encoding") == 0)
1274  {
1275  if (cstate->file_encoding >= 0)
1276  ereport(ERROR,
1277  (errcode(ERRCODE_SYNTAX_ERROR),
1278  errmsg("conflicting or redundant options"),
1279  parser_errposition(pstate, defel->location)));
1281  if (cstate->file_encoding < 0)
1282  ereport(ERROR,
1283  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1284  errmsg("argument to option \"%s\" must be a valid encoding name",
1285  defel->defname),
1286  parser_errposition(pstate, defel->location)));
1287  }
1288  else
1289  ereport(ERROR,
1290  (errcode(ERRCODE_SYNTAX_ERROR),
1291  errmsg("option \"%s\" not recognized",
1292  defel->defname),
1293  parser_errposition(pstate, defel->location)));
1294  }
1295 
1296  /*
1297  * Check for incompatible options (must do these two before inserting
1298  * defaults)
1299  */
1300  if (cstate->binary && cstate->delim)
1301  ereport(ERROR,
1302  (errcode(ERRCODE_SYNTAX_ERROR),
1303  errmsg("cannot specify DELIMITER in BINARY mode")));
1304 
1305  if (cstate->binary && cstate->null_print)
1306  ereport(ERROR,
1307  (errcode(ERRCODE_SYNTAX_ERROR),
1308  errmsg("cannot specify NULL in BINARY mode")));
1309 
1310  /* Set defaults for omitted options */
1311  if (!cstate->delim)
1312  cstate->delim = cstate->csv_mode ? "," : "\t";
1313 
1314  if (!cstate->null_print)
1315  cstate->null_print = cstate->csv_mode ? "" : "\\N";
1316  cstate->null_print_len = strlen(cstate->null_print);
1317 
1318  if (cstate->csv_mode)
1319  {
1320  if (!cstate->quote)
1321  cstate->quote = "\"";
1322  if (!cstate->escape)
1323  cstate->escape = cstate->quote;
1324  }
1325 
1326  /* Only single-byte delimiter strings are supported. */
1327  if (strlen(cstate->delim) != 1)
1328  ereport(ERROR,
1329  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1330  errmsg("COPY delimiter must be a single one-byte character")));
1331 
1332  /* Disallow end-of-line characters */
1333  if (strchr(cstate->delim, '\r') != NULL ||
1334  strchr(cstate->delim, '\n') != NULL)
1335  ereport(ERROR,
1336  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337  errmsg("COPY delimiter cannot be newline or carriage return")));
1338 
1339  if (strchr(cstate->null_print, '\r') != NULL ||
1340  strchr(cstate->null_print, '\n') != NULL)
1341  ereport(ERROR,
1342  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1343  errmsg("COPY null representation cannot use newline or carriage return")));
1344 
1345  /*
1346  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1347  * backslash because it would be ambiguous. We can't allow the other
1348  * cases because data characters matching the delimiter must be
1349  * backslashed, and certain backslash combinations are interpreted
1350  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1351  * more than strictly necessary, but seems best for consistency and
1352  * future-proofing. Likewise we disallow all digits though only octal
1353  * digits are actually dangerous.
1354  */
1355  if (!cstate->csv_mode &&
1356  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1357  cstate->delim[0]) != NULL)
1358  ereport(ERROR,
1359  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1360  errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1361 
1362  /* Check header */
1363  if (!cstate->csv_mode && cstate->header_line)
1364  ereport(ERROR,
1365  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1366  errmsg("COPY HEADER available only in CSV mode")));
1367 
1368  /* Check quote */
1369  if (!cstate->csv_mode && cstate->quote != NULL)
1370  ereport(ERROR,
1371  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1372  errmsg("COPY quote available only in CSV mode")));
1373 
1374  if (cstate->csv_mode && strlen(cstate->quote) != 1)
1375  ereport(ERROR,
1376  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1377  errmsg("COPY quote must be a single one-byte character")));
1378 
1379  if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1380  ereport(ERROR,
1381  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1382  errmsg("COPY delimiter and quote must be different")));
1383 
1384  /* Check escape */
1385  if (!cstate->csv_mode && cstate->escape != NULL)
1386  ereport(ERROR,
1387  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1388  errmsg("COPY escape available only in CSV mode")));
1389 
1390  if (cstate->csv_mode && strlen(cstate->escape) != 1)
1391  ereport(ERROR,
1392  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1393  errmsg("COPY escape must be a single one-byte character")));
1394 
1395  /* Check force_quote */
1396  if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1397  ereport(ERROR,
1398  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1399  errmsg("COPY force quote available only in CSV mode")));
1400  if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1401  ereport(ERROR,
1402  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1403  errmsg("COPY force quote only available using COPY TO")));
1404 
1405  /* Check force_notnull */
1406  if (!cstate->csv_mode && cstate->force_notnull != NIL)
1407  ereport(ERROR,
1408  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1409  errmsg("COPY force not null available only in CSV mode")));
1410  if (cstate->force_notnull != NIL && !is_from)
1411  ereport(ERROR,
1412  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1413  errmsg("COPY force not null only available using COPY FROM")));
1414 
1415  /* Check force_null */
1416  if (!cstate->csv_mode && cstate->force_null != NIL)
1417  ereport(ERROR,
1418  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1419  errmsg("COPY force null available only in CSV mode")));
1420 
1421  if (cstate->force_null != NIL && !is_from)
1422  ereport(ERROR,
1423  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1424  errmsg("COPY force null only available using COPY FROM")));
1425 
1426  /* Don't allow the delimiter to appear in the null string. */
1427  if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1428  ereport(ERROR,
1429  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1430  errmsg("COPY delimiter must not appear in the NULL specification")));
1431 
1432  /* Don't allow the CSV quote char to appear in the null string. */
1433  if (cstate->csv_mode &&
1434  strchr(cstate->null_print, cstate->quote[0]) != NULL)
1435  ereport(ERROR,
1436  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1437  errmsg("CSV quote character must not appear in the NULL specification")));
1438 }
1439 
1440 /*
1441  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1442  *
1443  * Iff <binary>, unload or reload in the binary format, as opposed to the
1444  * more wasteful but more robust and portable text format.
1445  *
1446  * Iff <oids>, unload or reload the format that includes OID information.
1447  * On input, we accept OIDs whether or not the table has an OID column,
1448  * but silently drop them if it does not. On output, we report an error
1449  * if the user asks for OIDs in a table that has none (not providing an
1450  * OID column might seem friendlier, but could seriously confuse programs).
1451  *
1452  * If in the text format, delimit columns with delimiter <delim> and print
1453  * NULL values as <null_print>.
1454  */
1455 static CopyState
1457  bool is_from,
1458  Relation rel,
1459  RawStmt *raw_query,
1460  Oid queryRelId,
1461  List *attnamelist,
1462  List *options)
1463 {
1464  CopyState cstate;
1465  TupleDesc tupDesc;
1466  int num_phys_attrs;
1467  MemoryContext oldcontext;
1468 
1469  /* Allocate workspace and zero all fields */
1470  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1471 
1472  /*
1473  * We allocate everything used by a cstate in a new memory context. This
1474  * avoids memory leaks during repeated use of COPY in a query.
1475  */
1477  "COPY",
1479 
1480  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1481 
1482  /* Extract options from the statement node tree */
1483  ProcessCopyOptions(pstate, cstate, is_from, options);
1484 
1485  /* Process the source/target relation or query */
1486  if (rel)
1487  {
1488  Assert(!raw_query);
1489 
1490  cstate->rel = rel;
1491 
1492  tupDesc = RelationGetDescr(cstate->rel);
1493  }
1494  else
1495  {
1496  List *rewritten;
1497  Query *query;
1498  PlannedStmt *plan;
1499  DestReceiver *dest;
1500 
1501  Assert(!is_from);
1502  cstate->rel = NULL;
1503 
1504  /*
1505  * Run parse analysis and rewrite. Note this also acquires sufficient
1506  * locks on the source table(s).
1507  *
1508  * Because the parser and planner tend to scribble on their input, we
1509  * make a preliminary copy of the source querytree. This prevents
1510  * problems in the case that the COPY is in a portal or plpgsql
1511  * function and is executed repeatedly. (See also the same hack in
1512  * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1513  */
1514  rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1515  pstate->p_sourcetext, NULL, 0,
1516  NULL);
1517 
1518  /* check that we got back something we can work with */
1519  if (rewritten == NIL)
1520  {
1521  ereport(ERROR,
1522  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1523  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1524  }
1525  else if (list_length(rewritten) > 1)
1526  {
1527  ListCell *lc;
1528 
1529  /* examine queries to determine which error message to issue */
1530  foreach(lc, rewritten)
1531  {
1532  Query *q = lfirst_node(Query, lc);
1533 
1535  ereport(ERROR,
1536  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1537  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1539  ereport(ERROR,
1540  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1541  errmsg("DO ALSO rules are not supported for the COPY")));
1542  }
1543 
1544  ereport(ERROR,
1545  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1546  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1547  }
1548 
1549  query = linitial_node(Query, rewritten);
1550 
1551  /* The grammar allows SELECT INTO, but we don't support that */
1552  if (query->utilityStmt != NULL &&
1554  ereport(ERROR,
1555  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1556  errmsg("COPY (SELECT INTO) is not supported")));
1557 
1558  Assert(query->utilityStmt == NULL);
1559 
1560  /*
1561  * Similarly the grammar doesn't enforce the presence of a RETURNING
1562  * clause, but this is required here.
1563  */
1564  if (query->commandType != CMD_SELECT &&
1565  query->returningList == NIL)
1566  {
1567  Assert(query->commandType == CMD_INSERT ||
1568  query->commandType == CMD_UPDATE ||
1569  query->commandType == CMD_DELETE);
1570 
1571  ereport(ERROR,
1572  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1573  errmsg("COPY query must have a RETURNING clause")));
1574  }
1575 
1576  /* plan the query */
1577  plan = pg_plan_query(query, pstate->p_sourcetext,
1578  CURSOR_OPT_PARALLEL_OK, NULL);
1579 
1580  /*
1581  * With row level security and a user using "COPY relation TO", we
1582  * have to convert the "COPY relation TO" to a query-based COPY (eg:
1583  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1584  * in any RLS clauses.
1585  *
1586  * When this happens, we are passed in the relid of the originally
1587  * found relation (which we have locked). As the planner will look up
1588  * the relation again, we double-check here to make sure it found the
1589  * same one that we have locked.
1590  */
1591  if (queryRelId != InvalidOid)
1592  {
1593  /*
1594  * Note that with RLS involved there may be multiple relations,
1595  * and while the one we need is almost certainly first, we don't
1596  * make any guarantees of that in the planner, so check the whole
1597  * list and make sure we find the original relation.
1598  */
1599  if (!list_member_oid(plan->relationOids, queryRelId))
1600  ereport(ERROR,
1601  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1602  errmsg("relation referenced by COPY statement has changed")));
1603  }
1604 
1605  /*
1606  * Use a snapshot with an updated command ID to ensure this query sees
1607  * results of any previously executed queries.
1608  */
1611 
1612  /* Create dest receiver for COPY OUT */
1614  ((DR_copy *) dest)->cstate = cstate;
1615 
1616  /* Create a QueryDesc requesting no output */
1617  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1620  dest, NULL, NULL, 0);
1621 
1622  /*
1623  * Call ExecutorStart to prepare the plan for execution.
1624  *
1625  * ExecutorStart computes a result tupdesc for us
1626  */
1627  ExecutorStart(cstate->queryDesc, 0);
1628 
1629  tupDesc = cstate->queryDesc->tupDesc;
1630  }
1631 
1632  /* Generate or convert list of attributes to process */
1633  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1634 
1635  num_phys_attrs = tupDesc->natts;
1636 
1637  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1638  cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1639  if (cstate->force_quote_all)
1640  {
1641  int i;
1642 
1643  for (i = 0; i < num_phys_attrs; i++)
1644  cstate->force_quote_flags[i] = true;
1645  }
1646  else if (cstate->force_quote)
1647  {
1648  List *attnums;
1649  ListCell *cur;
1650 
1651  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1652 
1653  foreach(cur, attnums)
1654  {
1655  int attnum = lfirst_int(cur);
1656  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1657 
1658  if (!list_member_int(cstate->attnumlist, attnum))
1659  ereport(ERROR,
1660  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1661  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1662  NameStr(attr->attname))));
1663  cstate->force_quote_flags[attnum - 1] = true;
1664  }
1665  }
1666 
1667  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1668  cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1669  if (cstate->force_notnull)
1670  {
1671  List *attnums;
1672  ListCell *cur;
1673 
1674  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1675 
1676  foreach(cur, attnums)
1677  {
1678  int attnum = lfirst_int(cur);
1679  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1680 
1681  if (!list_member_int(cstate->attnumlist, attnum))
1682  ereport(ERROR,
1683  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1684  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1685  NameStr(attr->attname))));
1686  cstate->force_notnull_flags[attnum - 1] = true;
1687  }
1688  }
1689 
1690  /* Convert FORCE_NULL name list to per-column flags, check validity */
1691  cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1692  if (cstate->force_null)
1693  {
1694  List *attnums;
1695  ListCell *cur;
1696 
1697  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1698 
1699  foreach(cur, attnums)
1700  {
1701  int attnum = lfirst_int(cur);
1702  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1703 
1704  if (!list_member_int(cstate->attnumlist, attnum))
1705  ereport(ERROR,
1706  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1707  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1708  NameStr(attr->attname))));
1709  cstate->force_null_flags[attnum - 1] = true;
1710  }
1711  }
1712 
1713  /* Convert convert_selectively name list to per-column flags */
1714  if (cstate->convert_selectively)
1715  {
1716  List *attnums;
1717  ListCell *cur;
1718 
1719  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1720 
1721  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1722 
1723  foreach(cur, attnums)
1724  {
1725  int attnum = lfirst_int(cur);
1726  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1727 
1728  if (!list_member_int(cstate->attnumlist, attnum))
1729  ereport(ERROR,
1730  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1731  errmsg_internal("selected column \"%s\" not referenced by COPY",
1732  NameStr(attr->attname))));
1733  cstate->convert_select_flags[attnum - 1] = true;
1734  }
1735  }
1736 
1737  /* Use client encoding when ENCODING option is not specified. */
1738  if (cstate->file_encoding < 0)
1740 
1741  /*
1742  * Set up encoding conversion info. Even if the file and server encodings
1743  * are the same, we must apply pg_any_to_server() to validate data in
1744  * multibyte encodings.
1745  */
1746  cstate->need_transcoding =
1747  (cstate->file_encoding != GetDatabaseEncoding() ||
1749  /* See Multibyte encoding comment above */
1751 
1752  cstate->copy_dest = COPY_FILE; /* default */
1753 
1754  MemoryContextSwitchTo(oldcontext);
1755 
1756  return cstate;
1757 }
1758 
1759 /*
1760  * Closes the pipe to an external program, checking the pclose() return code.
1761  */
1762 static void
1764 {
1765  int pclose_rc;
1766 
1767  Assert(cstate->is_program);
1768 
1769  pclose_rc = ClosePipeStream(cstate->copy_file);
1770  if (pclose_rc == -1)
1771  ereport(ERROR,
1773  errmsg("could not close pipe to external command: %m")));
1774  else if (pclose_rc != 0)
1775  {
1776  /*
1777  * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1778  * expectable for the called program to fail with SIGPIPE, and we
1779  * should not report that as an error. Otherwise, SIGPIPE indicates a
1780  * problem.
1781  */
1782  if (cstate->is_copy_from && !cstate->reached_eof &&
1783  wait_result_is_signal(pclose_rc, SIGPIPE))
1784  return;
1785 
1786  ereport(ERROR,
1787  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1788  errmsg("program \"%s\" failed",
1789  cstate->filename),
1790  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1791  }
1792 }
1793 
1794 /*
1795  * Release resources allocated in a cstate for COPY TO/FROM.
1796  */
1797 static void
1799 {
1800  if (cstate->is_program)
1801  {
1802  ClosePipeToProgram(cstate);
1803  }
1804  else
1805  {
1806  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1807  ereport(ERROR,
1809  errmsg("could not close file \"%s\": %m",
1810  cstate->filename)));
1811  }
1812 
1814  pfree(cstate);
1815 }
1816 
1817 /*
1818  * Setup CopyState to read tuples from a table or a query for COPY TO.
1819  */
1820 static CopyState
1822  Relation rel,
1823  RawStmt *query,
1824  Oid queryRelId,
1825  const char *filename,
1826  bool is_program,
1827  List *attnamelist,
1828  List *options)
1829 {
1830  CopyState cstate;
1831  bool pipe = (filename == NULL);
1832  MemoryContext oldcontext;
1833 
1834  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1835  {
1836  if (rel->rd_rel->relkind == RELKIND_VIEW)
1837  ereport(ERROR,
1838  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1839  errmsg("cannot copy from view \"%s\"",
1841  errhint("Try the COPY (SELECT ...) TO variant.")));
1842  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1843  ereport(ERROR,
1844  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1845  errmsg("cannot copy from materialized view \"%s\"",
1847  errhint("Try the COPY (SELECT ...) TO variant.")));
1848  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1849  ereport(ERROR,
1850  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1851  errmsg("cannot copy from foreign table \"%s\"",
1853  errhint("Try the COPY (SELECT ...) TO variant.")));
1854  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1855  ereport(ERROR,
1856  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1857  errmsg("cannot copy from sequence \"%s\"",
1858  RelationGetRelationName(rel))));
1859  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1860  ereport(ERROR,
1861  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1862  errmsg("cannot copy from partitioned table \"%s\"",
1864  errhint("Try the COPY (SELECT ...) TO variant.")));
1865  else
1866  ereport(ERROR,
1867  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1868  errmsg("cannot copy from non-table relation \"%s\"",
1869  RelationGetRelationName(rel))));
1870  }
1871 
1872  cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1873  options);
1874  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1875 
1876  if (pipe)
1877  {
1878  Assert(!is_program); /* the grammar does not allow this */
1880  cstate->copy_file = stdout;
1881  }
1882  else
1883  {
1884  cstate->filename = pstrdup(filename);
1885  cstate->is_program = is_program;
1886 
1887  if (is_program)
1888  {
1889  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1890  if (cstate->copy_file == NULL)
1891  ereport(ERROR,
1893  errmsg("could not execute command \"%s\": %m",
1894  cstate->filename)));
1895  }
1896  else
1897  {
1898  mode_t oumask; /* Pre-existing umask value */
1899  struct stat st;
1900 
1901  /*
1902  * Prevent write to relative path ... too easy to shoot oneself in
1903  * the foot by overwriting a database file ...
1904  */
1905  if (!is_absolute_path(filename))
1906  ereport(ERROR,
1907  (errcode(ERRCODE_INVALID_NAME),
1908  errmsg("relative path not allowed for COPY to file")));
1909 
1910  oumask = umask(S_IWGRP | S_IWOTH);
1911  PG_TRY();
1912  {
1913  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1914  }
1915  PG_FINALLY();
1916  {
1917  umask(oumask);
1918  }
1919  PG_END_TRY();
1920  if (cstate->copy_file == NULL)
1921  {
1922  /* copy errno because ereport subfunctions might change it */
1923  int save_errno = errno;
1924 
1925  ereport(ERROR,
1927  errmsg("could not open file \"%s\" for writing: %m",
1928  cstate->filename),
1929  (save_errno == ENOENT || save_errno == EACCES) ?
1930  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1931  "You may want a client-side facility such as psql's \\copy.") : 0));
1932  }
1933 
1934  if (fstat(fileno(cstate->copy_file), &st))
1935  ereport(ERROR,
1937  errmsg("could not stat file \"%s\": %m",
1938  cstate->filename)));
1939 
1940  if (S_ISDIR(st.st_mode))
1941  ereport(ERROR,
1942  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1943  errmsg("\"%s\" is a directory", cstate->filename)));
1944  }
1945  }
1946 
1947  MemoryContextSwitchTo(oldcontext);
1948 
1949  return cstate;
1950 }
1951 
1952 /*
1953  * This intermediate routine exists mainly to localize the effects of setjmp
1954  * so we don't need to plaster a lot of variables with "volatile".
1955  */
1956 static uint64
1958 {
1959  bool pipe = (cstate->filename == NULL);
1960  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1961  uint64 processed;
1962 
1963  PG_TRY();
1964  {
1965  if (fe_copy)
1966  SendCopyBegin(cstate);
1967 
1968  processed = CopyTo(cstate);
1969 
1970  if (fe_copy)
1971  SendCopyEnd(cstate);
1972  }
1973  PG_CATCH();
1974  {
1975  /*
1976  * Make sure we turn off old-style COPY OUT mode upon error. It is
1977  * okay to do this in all cases, since it does nothing if the mode is
1978  * not on.
1979  */
1980  pq_endcopyout(true);
1981  PG_RE_THROW();
1982  }
1983  PG_END_TRY();
1984 
1985  return processed;
1986 }
1987 
1988 /*
1989  * Clean up storage and release resources for COPY TO.
1990  */
1991 static void
1993 {
1994  if (cstate->queryDesc != NULL)
1995  {
1996  /* Close down the query and free resources. */
1997  ExecutorFinish(cstate->queryDesc);
1998  ExecutorEnd(cstate->queryDesc);
1999  FreeQueryDesc(cstate->queryDesc);
2001  }
2002 
2003  /* Clean up storage */
2004  EndCopy(cstate);
2005 }
2006 
2007 /*
2008  * Copy from relation or query TO file.
2009  */
2010 static uint64
2012 {
2013  TupleDesc tupDesc;
2014  int num_phys_attrs;
2015  ListCell *cur;
2016  uint64 processed;
2017 
2018  if (cstate->rel)
2019  tupDesc = RelationGetDescr(cstate->rel);
2020  else
2021  tupDesc = cstate->queryDesc->tupDesc;
2022  num_phys_attrs = tupDesc->natts;
2023  cstate->null_print_client = cstate->null_print; /* default */
2024 
2025  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
2026  cstate->fe_msgbuf = makeStringInfo();
2027 
2028  /* Get info about the columns we need to process. */
2029  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2030  foreach(cur, cstate->attnumlist)
2031  {
2032  int attnum = lfirst_int(cur);
2033  Oid out_func_oid;
2034  bool isvarlena;
2035  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
2036 
2037  if (cstate->binary)
2038  getTypeBinaryOutputInfo(attr->atttypid,
2039  &out_func_oid,
2040  &isvarlena);
2041  else
2042  getTypeOutputInfo(attr->atttypid,
2043  &out_func_oid,
2044  &isvarlena);
2045  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
2046  }
2047 
2048  /*
2049  * Create a temporary memory context that we can reset once per row to
2050  * recover palloc'd memory. This avoids any problems with leaks inside
2051  * datatype output routines, and should be faster than retail pfree's
2052  * anyway. (We don't need a whole econtext as CopyFrom does.)
2053  */
2055  "COPY TO",
2057 
2058  if (cstate->binary)
2059  {
2060  /* Generate header for a binary copy */
2061  int32 tmp;
2062 
2063  /* Signature */
2064  CopySendData(cstate, BinarySignature, 11);
2065  /* Flags field */
2066  tmp = 0;
2067  CopySendInt32(cstate, tmp);
2068  /* No header extension */
2069  tmp = 0;
2070  CopySendInt32(cstate, tmp);
2071  }
2072  else
2073  {
2074  /*
2075  * For non-binary copy, we need to convert null_print to file
2076  * encoding, because it will be sent directly with CopySendString.
2077  */
2078  if (cstate->need_transcoding)
2079  cstate->null_print_client = pg_server_to_any(cstate->null_print,
2080  cstate->null_print_len,
2081  cstate->file_encoding);
2082 
2083  /* if a header has been requested send the line */
2084  if (cstate->header_line)
2085  {
2086  bool hdr_delim = false;
2087 
2088  foreach(cur, cstate->attnumlist)
2089  {
2090  int attnum = lfirst_int(cur);
2091  char *colname;
2092 
2093  if (hdr_delim)
2094  CopySendChar(cstate, cstate->delim[0]);
2095  hdr_delim = true;
2096 
2097  colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2098 
2099  CopyAttributeOutCSV(cstate, colname, false,
2100  list_length(cstate->attnumlist) == 1);
2101  }
2102 
2103  CopySendEndOfRow(cstate);
2104  }
2105  }
2106 
2107  if (cstate->rel)
2108  {
2109  TupleTableSlot *slot;
2110  TableScanDesc scandesc;
2111 
2112  scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2113  slot = table_slot_create(cstate->rel, NULL);
2114 
2115  processed = 0;
2116  while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
2117  {
2119 
2120  /* Deconstruct the tuple ... */
2121  slot_getallattrs(slot);
2122 
2123  /* Format and send the data */
2124  CopyOneRowTo(cstate, slot);
2125  processed++;
2126  }
2127 
2129  table_endscan(scandesc);
2130  }
2131  else
2132  {
2133  /* run the plan --- the dest receiver will send tuples */
2134  ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2135  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2136  }
2137 
2138  if (cstate->binary)
2139  {
2140  /* Generate trailer for a binary copy */
2141  CopySendInt16(cstate, -1);
2142  /* Need to flush out the trailer */
2143  CopySendEndOfRow(cstate);
2144  }
2145 
2147 
2148  return processed;
2149 }
2150 
2151 /*
2152  * Emit one row during CopyTo().
2153  */
2154 static void
2156 {
2157  bool need_delim = false;
2159  MemoryContext oldcontext;
2160  ListCell *cur;
2161  char *string;
2162 
2163  MemoryContextReset(cstate->rowcontext);
2164  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2165 
2166  if (cstate->binary)
2167  {
2168  /* Binary per-tuple header */
2169  CopySendInt16(cstate, list_length(cstate->attnumlist));
2170  }
2171 
2172  /* Make sure the tuple is fully deconstructed */
2173  slot_getallattrs(slot);
2174 
2175  foreach(cur, cstate->attnumlist)
2176  {
2177  int attnum = lfirst_int(cur);
2178  Datum value = slot->tts_values[attnum - 1];
2179  bool isnull = slot->tts_isnull[attnum - 1];
2180 
2181  if (!cstate->binary)
2182  {
2183  if (need_delim)
2184  CopySendChar(cstate, cstate->delim[0]);
2185  need_delim = true;
2186  }
2187 
2188  if (isnull)
2189  {
2190  if (!cstate->binary)
2191  CopySendString(cstate, cstate->null_print_client);
2192  else
2193  CopySendInt32(cstate, -1);
2194  }
2195  else
2196  {
2197  if (!cstate->binary)
2198  {
2199  string = OutputFunctionCall(&out_functions[attnum - 1],
2200  value);
2201  if (cstate->csv_mode)
2202  CopyAttributeOutCSV(cstate, string,
2203  cstate->force_quote_flags[attnum - 1],
2204  list_length(cstate->attnumlist) == 1);
2205  else
2206  CopyAttributeOutText(cstate, string);
2207  }
2208  else
2209  {
2210  bytea *outputbytes;
2211 
2212  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2213  value);
2214  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2215  CopySendData(cstate, VARDATA(outputbytes),
2216  VARSIZE(outputbytes) - VARHDRSZ);
2217  }
2218  }
2219  }
2220 
2221  CopySendEndOfRow(cstate);
2222 
2223  MemoryContextSwitchTo(oldcontext);
2224 }
2225 
2226 
2227 /*
2228  * error context callback for COPY FROM
2229  *
2230  * The argument for the error context must be CopyState.
2231  */
2232 void
2234 {
2235  CopyState cstate = (CopyState) arg;
2236  char curlineno_str[32];
2237 
2238  snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2239  cstate->cur_lineno);
2240 
2241  if (cstate->binary)
2242  {
2243  /* can't usefully display the data */
2244  if (cstate->cur_attname)
2245  errcontext("COPY %s, line %s, column %s",
2246  cstate->cur_relname, curlineno_str,
2247  cstate->cur_attname);
2248  else
2249  errcontext("COPY %s, line %s",
2250  cstate->cur_relname, curlineno_str);
2251  }
2252  else
2253  {
2254  if (cstate->cur_attname && cstate->cur_attval)
2255  {
2256  /* error is relevant to a particular column */
2257  char *attval;
2258 
2259  attval = limit_printout_length(cstate->cur_attval);
2260  errcontext("COPY %s, line %s, column %s: \"%s\"",
2261  cstate->cur_relname, curlineno_str,
2262  cstate->cur_attname, attval);
2263  pfree(attval);
2264  }
2265  else if (cstate->cur_attname)
2266  {
2267  /* error is relevant to a particular column, value is NULL */
2268  errcontext("COPY %s, line %s, column %s: null input",
2269  cstate->cur_relname, curlineno_str,
2270  cstate->cur_attname);
2271  }
2272  else
2273  {
2274  /*
2275  * Error is relevant to a particular line.
2276  *
2277  * If line_buf still contains the correct line, and it's already
2278  * transcoded, print it. If it's still in a foreign encoding, it's
2279  * quite likely that the error is precisely a failure to do
2280  * encoding conversion (ie, bad data). We dare not try to convert
2281  * it, and at present there's no way to regurgitate it without
2282  * conversion. So we have to punt and just report the line number.
2283  */
2284  if (cstate->line_buf_valid &&
2285  (cstate->line_buf_converted || !cstate->need_transcoding))
2286  {
2287  char *lineval;
2288 
2289  lineval = limit_printout_length(cstate->line_buf.data);
2290  errcontext("COPY %s, line %s: \"%s\"",
2291  cstate->cur_relname, curlineno_str, lineval);
2292  pfree(lineval);
2293  }
2294  else
2295  {
2296  errcontext("COPY %s, line %s",
2297  cstate->cur_relname, curlineno_str);
2298  }
2299  }
2300  }
2301 }
2302 
2303 /*
2304  * Make sure we don't print an unreasonable amount of COPY data in a message.
2305  *
2306  * Returns a pstrdup'd copy of the input.
2307  */
2308 static char *
2310 {
2311 #define MAX_COPY_DATA_DISPLAY 100
2312 
2313  int slen = strlen(str);
2314  int len;
2315  char *res;
2316 
2317  /* Fast path if definitely okay */
2318  if (slen <= MAX_COPY_DATA_DISPLAY)
2319  return pstrdup(str);
2320 
2321  /* Apply encoding-dependent truncation */
2322  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2323 
2324  /*
2325  * Truncate, and add "..." to show we truncated the input.
2326  */
2327  res = (char *) palloc(len + 4);
2328  memcpy(res, str, len);
2329  strcpy(res + len, "...");
2330 
2331  return res;
2332 }
2333 
2334 /*
2335  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
2336  * ResultRelInfo.
2337  */
2338 static CopyMultiInsertBuffer *
2340 {
2341  CopyMultiInsertBuffer *buffer;
2342 
2343  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
2344  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
2345  buffer->resultRelInfo = rri;
2346  buffer->bistate = GetBulkInsertState();
2347  buffer->nused = 0;
2348 
2349  return buffer;
2350 }
2351 
2352 /*
2353  * Make a new buffer for this ResultRelInfo.
2354  */
2355 static inline void
2357  ResultRelInfo *rri)
2358 {
2359  CopyMultiInsertBuffer *buffer;
2360 
2361  buffer = CopyMultiInsertBufferInit(rri);
2362 
2363  /* Setup back-link so we can easily find this buffer again */
2364  rri->ri_CopyMultiInsertBuffer = buffer;
2365  /* Record that we're tracking this buffer */
2366  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2367 }
2368 
2369 /*
2370  * Initialize an already allocated CopyMultiInsertInfo.
2371  *
2372  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
2373  * for that table.
2374  */
2375 static void
2377  CopyState cstate, EState *estate, CommandId mycid,
2378  int ti_options)
2379 {
2380  miinfo->multiInsertBuffers = NIL;
2381  miinfo->bufferedTuples = 0;
2382  miinfo->bufferedBytes = 0;
2383  miinfo->cstate = cstate;
2384  miinfo->estate = estate;
2385  miinfo->mycid = mycid;
2386  miinfo->ti_options = ti_options;
2387 
2388  /*
2389  * Only setup the buffer when not dealing with a partitioned table.
2390  * Buffers for partitioned tables will just be setup when we need to send
2391  * tuples their way for the first time.
2392  */
2393  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
2394  CopyMultiInsertInfoSetupBuffer(miinfo, rri);
2395 }
2396 
2397 /*
2398  * Returns true if the buffers are full
2399  */
2400 static inline bool
2402 {
2403  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
2404  miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
2405  return true;
2406  return false;
2407 }
2408 
2409 /*
2410  * Returns true if we have no buffered tuples
2411  */
2412 static inline bool
2414 {
2415  return miinfo->bufferedTuples == 0;
2416 }
2417 
2418 /*
2419  * Write the tuples stored in 'buffer' out to the table.
2420  */
2421 static inline void
2423  CopyMultiInsertBuffer *buffer)
2424 {
2425  MemoryContext oldcontext;
2426  int i;
2427  uint64 save_cur_lineno;
2428  CopyState cstate = miinfo->cstate;
2429  EState *estate = miinfo->estate;
2430  CommandId mycid = miinfo->mycid;
2431  int ti_options = miinfo->ti_options;
2432  bool line_buf_valid = cstate->line_buf_valid;
2433  int nused = buffer->nused;
2434  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
2435  TupleTableSlot **slots = buffer->slots;
2436 
2437  /* Set es_result_relation_info to the ResultRelInfo we're flushing. */
2438  estate->es_result_relation_info = resultRelInfo;
2439 
2440  /*
2441  * Print error context information correctly, if one of the operations
2442  * below fail.
2443  */
2444  cstate->line_buf_valid = false;
2445  save_cur_lineno = cstate->cur_lineno;
2446 
2447  /*
2448  * table_multi_insert may leak memory, so switch to short-lived memory
2449  * context before calling it.
2450  */
2451  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2452  table_multi_insert(resultRelInfo->ri_RelationDesc,
2453  slots,
2454  nused,
2455  mycid,
2456  ti_options,
2457  buffer->bistate);
2458  MemoryContextSwitchTo(oldcontext);
2459 
2460  for (i = 0; i < nused; i++)
2461  {
2462  /*
2463  * If there are any indexes, update them for all the inserted tuples,
2464  * and run AFTER ROW INSERT triggers.
2465  */
2466  if (resultRelInfo->ri_NumIndices > 0)
2467  {
2468  List *recheckIndexes;
2469 
2470  cstate->cur_lineno = buffer->linenos[i];
2471  recheckIndexes =
2472  ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
2473  NIL);
2474  ExecARInsertTriggers(estate, resultRelInfo,
2475  slots[i], recheckIndexes,
2476  cstate->transition_capture);
2477  list_free(recheckIndexes);
2478  }
2479 
2480  /*
2481  * There's no indexes, but see if we need to run AFTER ROW INSERT
2482  * triggers anyway.
2483  */
2484  else if (resultRelInfo->ri_TrigDesc != NULL &&
2485  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2486  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2487  {
2488  cstate->cur_lineno = buffer->linenos[i];
2489  ExecARInsertTriggers(estate, resultRelInfo,
2490  slots[i], NIL, cstate->transition_capture);
2491  }
2492 
2493  ExecClearTuple(slots[i]);
2494  }
2495 
2496  /* Mark that all slots are free */
2497  buffer->nused = 0;
2498 
2499  /* reset cur_lineno and line_buf_valid to what they were */
2500  cstate->line_buf_valid = line_buf_valid;
2501  cstate->cur_lineno = save_cur_lineno;
2502 }
2503 
2504 /*
2505  * Drop used slots and free member for this buffer.
2506  *
2507  * The buffer must be flushed before cleanup.
2508  */
2509 static inline void
2511  CopyMultiInsertBuffer *buffer)
2512 {
2513  int i;
2514 
2515  /* Ensure buffer was flushed */
2516  Assert(buffer->nused == 0);
2517 
2518  /* Remove back-link to ourself */
2519  buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
2520 
2521  FreeBulkInsertState(buffer->bistate);
2522 
2523  /* Since we only create slots on demand, just drop the non-null ones. */
2524  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
2525  ExecDropSingleTupleTableSlot(buffer->slots[i]);
2526 
2528  miinfo->ti_options);
2529 
2530  pfree(buffer);
2531 }
2532 
2533 /*
2534  * Write out all stored tuples in all buffers out to the tables.
2535  *
2536  * Once flushed we also trim the tracked buffers list down to size by removing
2537  * the buffers created earliest first.
2538  *
2539  * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
2540  * used. When cleaning up old buffers we'll never remove the one for
2541  * 'curr_rri'.
2542  */
2543 static inline void
2545 {
2546  ListCell *lc;
2547 
2548  foreach(lc, miinfo->multiInsertBuffers)
2549  {
2551 
2552  CopyMultiInsertBufferFlush(miinfo, buffer);
2553  }
2554 
2555  miinfo->bufferedTuples = 0;
2556  miinfo->bufferedBytes = 0;
2557 
2558  /*
2559  * Trim the list of tracked buffers down if it exceeds the limit. Here we
2560  * remove buffers starting with the ones we created first. It seems more
2561  * likely that these older ones are less likely to be needed than ones
2562  * that were just created.
2563  */
2565  {
2566  CopyMultiInsertBuffer *buffer;
2567 
2568  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2569 
2570  /*
2571  * We never want to remove the buffer that's currently being used, so
2572  * if we happen to find that then move it to the end of the list.
2573  */
2574  if (buffer->resultRelInfo == curr_rri)
2575  {
2577  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2578  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2579  }
2580 
2581  CopyMultiInsertBufferCleanup(miinfo, buffer);
2583  }
2584 }
2585 
2586 /*
2587  * Cleanup allocated buffers and free memory
2588  */
2589 static inline void
2591 {
2592  ListCell *lc;
2593 
2594  foreach(lc, miinfo->multiInsertBuffers)
2595  CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
2596 
2597  list_free(miinfo->multiInsertBuffers);
2598 }
2599 
2600 /*
2601  * Get the next TupleTableSlot that the next tuple should be stored in.
2602  *
2603  * Callers must ensure that the buffer is not full.
2604  */
2605 static inline TupleTableSlot *
2607  ResultRelInfo *rri)
2608 {
2610  int nused = buffer->nused;
2611 
2612  Assert(buffer != NULL);
2613  Assert(nused < MAX_BUFFERED_TUPLES);
2614 
2615  if (buffer->slots[nused] == NULL)
2616  buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
2617  return buffer->slots[nused];
2618 }
2619 
2620 /*
2621  * Record the previously reserved TupleTableSlot that was reserved by
2622  * CopyMultiInsertInfoNextFreeSlot as being consumed.
2623  */
2624 static inline void
2626  TupleTableSlot *slot, int tuplen, uint64 lineno)
2627 {
2629 
2630  Assert(buffer != NULL);
2631  Assert(slot == buffer->slots[buffer->nused]);
2632 
2633  /* Store the line number so we can properly report any errors later */
2634  buffer->linenos[buffer->nused] = lineno;
2635 
2636  /* Record this slot as being used */
2637  buffer->nused++;
2638 
2639  /* Update how many tuples are stored and their size */
2640  miinfo->bufferedTuples++;
2641  miinfo->bufferedBytes += tuplen;
2642 }
2643 
2644 /*
2645  * Copy FROM file to relation.
2646  */
2647 uint64
2649 {
2650  ResultRelInfo *resultRelInfo;
2651  ResultRelInfo *target_resultRelInfo;
2652  ResultRelInfo *prevResultRelInfo = NULL;
2653  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2654  ModifyTableState *mtstate;
2655  ExprContext *econtext;
2656  TupleTableSlot *singleslot = NULL;
2657  MemoryContext oldcontext = CurrentMemoryContext;
2658 
2659  PartitionTupleRouting *proute = NULL;
2660  ErrorContextCallback errcallback;
2661  CommandId mycid = GetCurrentCommandId(true);
2662  int ti_options = 0; /* start with default options for insert */
2663  BulkInsertState bistate = NULL;
2664  CopyInsertMethod insertMethod;
2665  CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
2666  uint64 processed = 0;
2667  bool has_before_insert_row_trig;
2668  bool has_instead_insert_row_trig;
2669  bool leafpart_use_multi_insert = false;
2670 
2671  Assert(cstate->rel);
2672 
2673  /*
2674  * The target must be a plain, foreign, or partitioned relation, or have
2675  * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
2676  * allowed on views, so we only hint about them in the view case.)
2677  */
2678  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2679  cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2680  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2681  !(cstate->rel->trigdesc &&
2683  {
2684  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2685  ereport(ERROR,
2686  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2687  errmsg("cannot copy to view \"%s\"",
2688  RelationGetRelationName(cstate->rel)),
2689  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2690  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2691  ereport(ERROR,
2692  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2693  errmsg("cannot copy to materialized view \"%s\"",
2694  RelationGetRelationName(cstate->rel))));
2695  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2696  ereport(ERROR,
2697  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2698  errmsg("cannot copy to sequence \"%s\"",
2699  RelationGetRelationName(cstate->rel))));
2700  else
2701  ereport(ERROR,
2702  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2703  errmsg("cannot copy to non-table relation \"%s\"",
2704  RelationGetRelationName(cstate->rel))));
2705  }
2706 
2707  /*
2708  * If the target file is new-in-transaction, we assume that checking FSM
2709  * for free space is a waste of time. This could possibly be wrong, but
2710  * it's unlikely.
2711  */
2712  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
2715  ti_options |= TABLE_INSERT_SKIP_FSM;
2716 
2717  /*
2718  * Optimize if new relfilenode was created in this subxact or one of its
2719  * committed children and we won't see those rows later as part of an
2720  * earlier scan or command. The subxact test ensures that if this subxact
2721  * aborts then the frozen rows won't be visible after xact cleanup. Note
2722  * that the stronger test of exactly which subtransaction created it is
2723  * crucial for correctness of this optimization. The test for an earlier
2724  * scan or command tolerates false negatives. FREEZE causes other sessions
2725  * to see rows they would not see under MVCC, and a false negative merely
2726  * spreads that anomaly to the current session.
2727  */
2728  if (cstate->freeze)
2729  {
2730  /*
2731  * We currently disallow COPY FREEZE on partitioned tables. The
2732  * reason for this is that we've simply not yet opened the partitions
2733  * to determine if the optimization can be applied to them. We could
2734  * go and open them all here, but doing so may be quite a costly
2735  * overhead for small copies. In any case, we may just end up routing
2736  * tuples to a small number of partitions. It seems better just to
2737  * raise an ERROR for partitioned tables.
2738  */
2739  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2740  {
2741  ereport(ERROR,
2742  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2743  errmsg("cannot perform COPY FREEZE on a partitioned table")));
2744  }
2745 
2746  /*
2747  * Tolerate one registration for the benefit of FirstXactSnapshot.
2748  * Scan-bearing queries generally create at least two registrations,
2749  * though relying on that is fragile, as is ignoring ActiveSnapshot.
2750  * Clear CatalogSnapshot to avoid counting its registration. We'll
2751  * still detect ongoing catalog scans, each of which separately
2752  * registers the snapshot it uses.
2753  */
2756  ereport(ERROR,
2757  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2758  errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
2759 
2760  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2762  ereport(ERROR,
2763  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2764  errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
2765 
2766  ti_options |= TABLE_INSERT_FROZEN;
2767  }
2768 
2769  /*
2770  * We need a ResultRelInfo so we can use the regular executor's
2771  * index-entry-making machinery. (There used to be a huge amount of code
2772  * here that basically duplicated execUtils.c ...)
2773  */
2774  resultRelInfo = makeNode(ResultRelInfo);
2775  InitResultRelInfo(resultRelInfo,
2776  cstate->rel,
2777  1, /* must match rel's position in range_table */
2778  NULL,
2779  0);
2780  target_resultRelInfo = resultRelInfo;
2781 
2782  /* Verify the named relation is a valid target for INSERT */
2783  CheckValidResultRel(resultRelInfo, CMD_INSERT);
2784 
2785  ExecOpenIndices(resultRelInfo, false);
2786 
2787  estate->es_result_relations = resultRelInfo;
2788  estate->es_num_result_relations = 1;
2789  estate->es_result_relation_info = resultRelInfo;
2790 
2791  ExecInitRangeTable(estate, cstate->range_table);
2792 
2793  /*
2794  * Set up a ModifyTableState so we can let FDW(s) init themselves for
2795  * foreign-table result relation(s).
2796  */
2797  mtstate = makeNode(ModifyTableState);
2798  mtstate->ps.plan = NULL;
2799  mtstate->ps.state = estate;
2800  mtstate->operation = CMD_INSERT;
2801  mtstate->resultRelInfo = estate->es_result_relations;
2802 
2803  if (resultRelInfo->ri_FdwRoutine != NULL &&
2804  resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
2805  resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
2806  resultRelInfo);
2807 
2808  /* Prepare to catch AFTER triggers. */
2810 
2811  /*
2812  * If there are any triggers with transition tables on the named relation,
2813  * we need to be prepared to capture transition tuples.
2814  *
2815  * Because partition tuple routing would like to know about whether
2816  * transition capture is active, we also set it in mtstate, which is
2817  * passed to ExecFindPartition() below.
2818  */
2819  cstate->transition_capture = mtstate->mt_transition_capture =
2821  RelationGetRelid(cstate->rel),
2822  CMD_INSERT);
2823 
2824  /*
2825  * If the named relation is a partitioned table, initialize state for
2826  * CopyFrom tuple routing.
2827  */
2828  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2829  proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel);
2830 
2831  if (cstate->whereClause)
2832  cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
2833  &mtstate->ps);
2834 
2835  /*
2836  * It's generally more efficient to prepare a bunch of tuples for
2837  * insertion, and insert them in one table_multi_insert() call, than call
2838  * table_tuple_insert() separately for every tuple. However, there are a
2839  * number of reasons why we might not be able to do this. These are
2840  * explained below.
2841  */
2842  if (resultRelInfo->ri_TrigDesc != NULL &&
2843  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2844  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2845  {
2846  /*
2847  * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
2848  * triggers on the table. Such triggers might query the table we're
2849  * inserting into and act differently if the tuples that have already
2850  * been processed and prepared for insertion are not there.
2851  */
2852  insertMethod = CIM_SINGLE;
2853  }
2854  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
2855  resultRelInfo->ri_TrigDesc->trig_insert_new_table)
2856  {
2857  /*
2858  * For partitioned tables we can't support multi-inserts when there
2859  * are any statement level insert triggers. It might be possible to
2860  * allow partitioned tables with such triggers in the future, but for
2861  * now, CopyMultiInsertInfoFlush expects that any before row insert
2862  * and statement level insert triggers are on the same relation.
2863  */
2864  insertMethod = CIM_SINGLE;
2865  }
2866  else if (resultRelInfo->ri_FdwRoutine != NULL ||
2867  cstate->volatile_defexprs)
2868  {
2869  /*
2870  * Can't support multi-inserts to foreign tables or if there are any
2871  * volatile default expressions in the table. Similarly to the
2872  * trigger case above, such expressions may query the table we're
2873  * inserting into.
2874  *
2875  * Note: It does not matter if any partitions have any volatile
2876  * default expressions as we use the defaults from the target of the
2877  * COPY command.
2878  */
2879  insertMethod = CIM_SINGLE;
2880  }
2881  else if (contain_volatile_functions(cstate->whereClause))
2882  {
2883  /*
2884  * Can't support multi-inserts if there are any volatile function
2885  * expressions in WHERE clause. Similarly to the trigger case above,
2886  * such expressions may query the table we're inserting into.
2887  */
2888  insertMethod = CIM_SINGLE;
2889  }
2890  else
2891  {
2892  /*
2893  * For partitioned tables, we may still be able to perform bulk
2894  * inserts. However, the possibility of this depends on which types
2895  * of triggers exist on the partition. We must disable bulk inserts
2896  * if the partition is a foreign table or it has any before row insert
2897  * or insert instead triggers (same as we checked above for the parent
2898  * table). Since the partition's resultRelInfos are initialized only
2899  * when we actually need to insert the first tuple into them, we must
2900  * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
2901  * flag that we must later determine if we can use bulk-inserts for
2902  * the partition being inserted into.
2903  */
2904  if (proute)
2905  insertMethod = CIM_MULTI_CONDITIONAL;
2906  else
2907  insertMethod = CIM_MULTI;
2908 
2909  CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
2910  estate, mycid, ti_options);
2911  }
2912 
2913  /*
2914  * If not using batch mode (which allocates slots as needed) set up a
2915  * tuple slot too. When inserting into a partitioned table, we also need
2916  * one, even if we might batch insert, to read the tuple in the root
2917  * partition's form.
2918  */
2919  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
2920  {
2921  singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
2922  &estate->es_tupleTable);
2923  bistate = GetBulkInsertState();
2924  }
2925 
2926  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2927  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
2928 
2929  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2930  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
2931 
2932  /*
2933  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2934  * should do this for COPY, since it's not really an "INSERT" statement as
2935  * such. However, executing these triggers maintains consistency with the
2936  * EACH ROW triggers that we already fire on COPY.
2937  */
2938  ExecBSInsertTriggers(estate, resultRelInfo);
2939 
2940  econtext = GetPerTupleExprContext(estate);
2941 
2942  /* Set up callback to identify error line number */
2943  errcallback.callback = CopyFromErrorCallback;
2944  errcallback.arg = (void *) cstate;
2945  errcallback.previous = error_context_stack;
2946  error_context_stack = &errcallback;
2947 
2948  for (;;)
2949  {
2950  TupleTableSlot *myslot;
2951  bool skip_tuple;
2952 
2954 
2955  /*
2956  * Reset the per-tuple exprcontext. We do this after every tuple, to
2957  * clean-up after expression evaluations etc.
2958  */
2959  ResetPerTupleExprContext(estate);
2960 
2961  /* select slot to (initially) load row into */
2962  if (insertMethod == CIM_SINGLE || proute)
2963  {
2964  myslot = singleslot;
2965  Assert(myslot != NULL);
2966  }
2967  else
2968  {
2969  Assert(resultRelInfo == target_resultRelInfo);
2970  Assert(insertMethod == CIM_MULTI);
2971 
2972  myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
2973  resultRelInfo);
2974  }
2975 
2976  /*
2977  * Switch to per-tuple context before calling NextCopyFrom, which does
2978  * evaluate default expressions etc. and requires per-tuple context.
2979  */
2981 
2982  ExecClearTuple(myslot);
2983 
2984  /* Directly store the values/nulls array in the slot */
2985  if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
2986  break;
2987 
2988  ExecStoreVirtualTuple(myslot);
2989 
2990  /*
2991  * Constraints and where clause might reference the tableoid column,
2992  * so (re-)initialize tts_tableOid before evaluating them.
2993  */
2994  myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
2995 
2996  /* Triggers and stuff need to be invoked in query context. */
2997  MemoryContextSwitchTo(oldcontext);
2998 
2999  if (cstate->whereClause)
3000  {
3001  econtext->ecxt_scantuple = myslot;
3002  /* Skip items that don't match COPY's WHERE clause */
3003  if (!ExecQual(cstate->qualexpr, econtext))
3004  continue;
3005  }
3006 
3007  /* Determine the partition to insert the tuple into */
3008  if (proute)
3009  {
3010  TupleConversionMap *map;
3011 
3012  /*
3013  * Attempt to find a partition suitable for this tuple.
3014  * ExecFindPartition() will raise an error if none can be found or
3015  * if the found partition is not suitable for INSERTs.
3016  */
3017  resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
3018  proute, myslot, estate);
3019 
3020  if (prevResultRelInfo != resultRelInfo)
3021  {
3022  /* Determine which triggers exist on this partition */
3023  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3024  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
3025 
3026  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3027  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
3028 
3029  /*
3030  * Disable multi-inserts when the partition has BEFORE/INSTEAD
3031  * OF triggers, or if the partition is a foreign partition.
3032  */
3033  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
3034  !has_before_insert_row_trig &&
3035  !has_instead_insert_row_trig &&
3036  resultRelInfo->ri_FdwRoutine == NULL;
3037 
3038  /* Set the multi-insert buffer to use for this partition. */
3039  if (leafpart_use_multi_insert)
3040  {
3041  if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
3042  CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
3043  resultRelInfo);
3044  }
3045  else if (insertMethod == CIM_MULTI_CONDITIONAL &&
3046  !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3047  {
3048  /*
3049  * Flush pending inserts if this partition can't use
3050  * batching, so rows are visible to triggers etc.
3051  */
3052  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3053  }
3054 
3055  if (bistate != NULL)
3056  ReleaseBulkInsertStatePin(bistate);
3057  prevResultRelInfo = resultRelInfo;
3058  }
3059 
3060  /*
3061  * For ExecInsertIndexTuples() to work on the partition's indexes
3062  */
3063  estate->es_result_relation_info = resultRelInfo;
3064 
3065  /*
3066  * If we're capturing transition tuples, we might need to convert
3067  * from the partition rowtype to root rowtype.
3068  */
3069  if (cstate->transition_capture != NULL)
3070  {
3071  if (has_before_insert_row_trig)
3072  {
3073  /*
3074  * If there are any BEFORE triggers on the partition,
3075  * we'll have to be ready to convert their result back to
3076  * tuplestore format.
3077  */
3079  cstate->transition_capture->tcs_map =
3080  resultRelInfo->ri_PartitionInfo->pi_PartitionToRootMap;
3081  }
3082  else
3083  {
3084  /*
3085  * Otherwise, just remember the original unconverted
3086  * tuple, to avoid a needless round trip conversion.
3087  */
3089  cstate->transition_capture->tcs_map = NULL;
3090  }
3091  }
3092 
3093  /*
3094  * We might need to convert from the root rowtype to the partition
3095  * rowtype.
3096  */
3097  map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
3098  if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
3099  {
3100  /* non batch insert */
3101  if (map != NULL)
3102  {
3103  TupleTableSlot *new_slot;
3104 
3105  new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
3106  myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
3107  }
3108  }
3109  else
3110  {
3111  /*
3112  * Prepare to queue up tuple for later batch insert into
3113  * current partition.
3114  */
3115  TupleTableSlot *batchslot;
3116 
3117  /* no other path available for partitioned table */
3118  Assert(insertMethod == CIM_MULTI_CONDITIONAL);
3119 
3120  batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
3121  resultRelInfo);
3122 
3123  if (map != NULL)
3124  myslot = execute_attr_map_slot(map->attrMap, myslot,
3125  batchslot);
3126  else
3127  {
3128  /*
3129  * This looks more expensive than it is (Believe me, I
3130  * optimized it away. Twice.). The input is in virtual
3131  * form, and we'll materialize the slot below - for most
3132  * slot types the copy performs the work materialization
3133  * would later require anyway.
3134  */
3135  ExecCopySlot(batchslot, myslot);
3136  myslot = batchslot;
3137  }
3138  }
3139 
3140  /* ensure that triggers etc see the right relation */
3141  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3142  }
3143 
3144  skip_tuple = false;
3145 
3146  /* BEFORE ROW INSERT Triggers */
3147  if (has_before_insert_row_trig)
3148  {
3149  if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
3150  skip_tuple = true; /* "do nothing" */
3151  }
3152 
3153  if (!skip_tuple)
3154  {
3155  /*
3156  * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
3157  * tuple. Otherwise, proceed with inserting the tuple into the
3158  * table or foreign table.
3159  */
3160  if (has_instead_insert_row_trig)
3161  {
3162  ExecIRInsertTriggers(estate, resultRelInfo, myslot);
3163  }
3164  else
3165  {
3166  /* Compute stored generated columns */
3167  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
3169  ExecComputeStoredGenerated(estate, myslot, CMD_INSERT);
3170 
3171  /*
3172  * If the target is a plain table, check the constraints of
3173  * the tuple.
3174  */
3175  if (resultRelInfo->ri_FdwRoutine == NULL &&
3176  resultRelInfo->ri_RelationDesc->rd_att->constr)
3177  ExecConstraints(resultRelInfo, myslot, estate);
3178 
3179  /*
3180  * Also check the tuple against the partition constraint, if
3181  * there is one; except that if we got here via tuple-routing,
3182  * we don't need to if there's no BR trigger defined on the
3183  * partition.
3184  */
3185  if (resultRelInfo->ri_PartitionCheck &&
3186  (proute == NULL || has_before_insert_row_trig))
3187  ExecPartitionCheck(resultRelInfo, myslot, estate, true);
3188 
3189  /* Store the slot in the multi-insert buffer, when enabled. */
3190  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
3191  {
3192  /*
3193  * The slot previously might point into the per-tuple
3194  * context. For batching it needs to be longer lived.
3195  */
3196  ExecMaterializeSlot(myslot);
3197 
3198  /* Add this tuple to the tuple buffer */
3199  CopyMultiInsertInfoStore(&multiInsertInfo,
3200  resultRelInfo, myslot,
3201  cstate->line_buf.len,
3202  cstate->cur_lineno);
3203 
3204  /*
3205  * If enough inserts have queued up, then flush all
3206  * buffers out to their tables.
3207  */
3208  if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
3209  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3210  }
3211  else
3212  {
3213  List *recheckIndexes = NIL;
3214 
3215  /* OK, store the tuple */
3216  if (resultRelInfo->ri_FdwRoutine != NULL)
3217  {
3218  myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
3219  resultRelInfo,
3220  myslot,
3221  NULL);
3222 
3223  if (myslot == NULL) /* "do nothing" */
3224  continue; /* next tuple please */
3225 
3226  /*
3227  * AFTER ROW Triggers might reference the tableoid
3228  * column, so (re-)initialize tts_tableOid before
3229  * evaluating them.
3230  */
3231  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3232  }
3233  else
3234  {
3235  /* OK, store the tuple and create index entries for it */
3236  table_tuple_insert(resultRelInfo->ri_RelationDesc,
3237  myslot, mycid, ti_options, bistate);
3238 
3239  if (resultRelInfo->ri_NumIndices > 0)
3240  recheckIndexes = ExecInsertIndexTuples(myslot,
3241  estate,
3242  false,
3243  NULL,
3244  NIL);
3245  }
3246 
3247  /* AFTER ROW INSERT Triggers */
3248  ExecARInsertTriggers(estate, resultRelInfo, myslot,
3249  recheckIndexes, cstate->transition_capture);
3250 
3251  list_free(recheckIndexes);
3252  }
3253  }
3254 
3255  /*
3256  * We count only tuples not suppressed by a BEFORE INSERT trigger
3257  * or FDW; this is the same definition used by nodeModifyTable.c
3258  * for counting tuples inserted by an INSERT command.
3259  */
3260  processed++;
3261  }
3262  }
3263 
3264  /* Flush any remaining buffered tuples */
3265  if (insertMethod != CIM_SINGLE)
3266  {
3267  if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3268  CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
3269  }
3270 
3271  /* Done, clean up */
3272  error_context_stack = errcallback.previous;
3273 
3274  if (bistate != NULL)
3275  FreeBulkInsertState(bistate);
3276 
3277  MemoryContextSwitchTo(oldcontext);
3278 
3279  /*
3280  * In the old protocol, tell pqcomm that we can process normal protocol
3281  * messages again.
3282  */
3283  if (cstate->copy_dest == COPY_OLD_FE)
3284  pq_endmsgread();
3285 
3286  /* Execute AFTER STATEMENT insertion triggers */
3287  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
3288 
3289  /* Handle queued AFTER triggers */
3290  AfterTriggerEndQuery(estate);
3291 
3292  ExecResetTupleTable(estate->es_tupleTable, false);
3293 
3294  /* Allow the FDW to shut down */
3295  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
3296  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
3297  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
3298  target_resultRelInfo);
3299 
3300  /* Tear down the multi-insert buffer data */
3301  if (insertMethod != CIM_SINGLE)
3302  CopyMultiInsertInfoCleanup(&multiInsertInfo);
3303 
3304  ExecCloseIndices(target_resultRelInfo);
3305 
3306  /* Close all the partitioned tables, leaf partitions, and their indices */
3307  if (proute)
3308  ExecCleanupTupleRouting(mtstate, proute);
3309 
3310  /* Close any trigger target relations */
3311  ExecCleanUpTriggerState(estate);
3312 
3313  FreeExecutorState(estate);
3314 
3315  return processed;
3316 }
3317 
3318 /*
3319  * Setup to read tuples from a file for COPY FROM.
3320  *
3321  * 'rel': Used as a template for the tuples
3322  * 'filename': Name of server-local file to read
3323  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3324  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3325  *
3326  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3327  */
3328 CopyState
3330  Relation rel,
3331  const char *filename,
3332  bool is_program,
3334  List *attnamelist,
3335  List *options)
3336 {
3337  CopyState cstate;
3338  bool pipe = (filename == NULL);
3339  TupleDesc tupDesc;
3340  AttrNumber num_phys_attrs,
3341  num_defaults;
3343  Oid *typioparams;
3344  int attnum;
3345  Oid in_func_oid;
3346  int *defmap;
3347  ExprState **defexprs;
3348  MemoryContext oldcontext;
3349  bool volatile_defexprs;
3350 
3351  cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3352  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3353 
3354  /* Initialize state variables */
3355  cstate->reached_eof = false;
3356  cstate->eol_type = EOL_UNKNOWN;
3357  cstate->cur_relname = RelationGetRelationName(cstate->rel);
3358  cstate->cur_lineno = 0;
3359  cstate->cur_attname = NULL;
3360  cstate->cur_attval = NULL;
3361 
3362  /* Set up variables to avoid per-attribute overhead. */
3363  initStringInfo(&cstate->attribute_buf);
3364  initStringInfo(&cstate->line_buf);
3365  cstate->line_buf_converted = false;
3366  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3367  cstate->raw_buf_index = cstate->raw_buf_len = 0;
3368 
3369  /* Assign range table, we'll need it in CopyFrom. */
3370  if (pstate)
3371  cstate->range_table = pstate->p_rtable;
3372 
3373  tupDesc = RelationGetDescr(cstate->rel);
3374  num_phys_attrs = tupDesc->natts;
3375  num_defaults = 0;
3376  volatile_defexprs = false;
3377 
3378  /*
3379  * Pick up the required catalog information for each attribute in the
3380  * relation, including the input function, the element type (to pass to
3381  * the input function), and info about defaults and constraints. (Which
3382  * input function we use depends on text/binary format choice.)
3383  */
3384  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3385  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3386  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3387  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3388 
3389  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3390  {
3391  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3392 
3393  /* We don't need info for dropped attributes */
3394  if (att->attisdropped)
3395  continue;
3396 
3397  /* Fetch the input function and typioparam info */
3398  if (cstate->binary)
3399  getTypeBinaryInputInfo(att->atttypid,
3400  &in_func_oid, &typioparams[attnum - 1]);
3401  else
3402  getTypeInputInfo(att->atttypid,
3403  &in_func_oid, &typioparams[attnum - 1]);
3404  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3405 
3406  /* Get default info if needed */
3407  if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
3408  {
3409  /* attribute is NOT to be copied from input */
3410  /* use default value if one exists */
3411  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3412  attnum);
3413 
3414  if (defexpr != NULL)
3415  {
3416  /* Run the expression through planner */
3417  defexpr = expression_planner(defexpr);
3418 
3419  /* Initialize executable expression in copycontext */
3420  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3421  defmap[num_defaults] = attnum - 1;
3422  num_defaults++;
3423 
3424  /*
3425  * If a default expression looks at the table being loaded,
3426  * then it could give the wrong answer when using
3427  * multi-insert. Since database access can be dynamic this is
3428  * hard to test for exactly, so we use the much wider test of
3429  * whether the default expression is volatile. We allow for
3430  * the special case of when the default expression is the
3431  * nextval() of a sequence which in this specific case is
3432  * known to be safe for use with the multi-insert
3433  * optimization. Hence we use this special case function
3434  * checker rather than the standard check for
3435  * contain_volatile_functions().
3436  */
3437  if (!volatile_defexprs)
3438  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3439  }
3440  }
3441  }
3442 
3443  /* We keep those variables in cstate. */
3444  cstate->in_functions = in_functions;
3445  cstate->typioparams = typioparams;
3446  cstate->defmap = defmap;
3447  cstate->defexprs = defexprs;
3449  cstate->num_defaults = num_defaults;
3450  cstate->is_program = is_program;
3451 
3452  if (data_source_cb)
3453  {
3454  cstate->copy_dest = COPY_CALLBACK;
3455  cstate->data_source_cb = data_source_cb;
3456  }
3457  else if (pipe)
3458  {
3459  Assert(!is_program); /* the grammar does not allow this */
3461  ReceiveCopyBegin(cstate);
3462  else
3463  cstate->copy_file = stdin;
3464  }
3465  else
3466  {
3467  cstate->filename = pstrdup(filename);
3468 
3469  if (cstate->is_program)
3470  {
3471  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3472  if (cstate->copy_file == NULL)
3473  ereport(ERROR,
3475  errmsg("could not execute command \"%s\": %m",
3476  cstate->filename)));
3477  }
3478  else
3479  {
3480  struct stat st;
3481 
3482  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3483  if (cstate->copy_file == NULL)
3484  {
3485  /* copy errno because ereport subfunctions might change it */
3486  int save_errno = errno;
3487 
3488  ereport(ERROR,
3490  errmsg("could not open file \"%s\" for reading: %m",
3491  cstate->filename),
3492  (save_errno == ENOENT || save_errno == EACCES) ?
3493  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3494  "You may want a client-side facility such as psql's \\copy.") : 0));
3495  }
3496 
3497  if (fstat(fileno(cstate->copy_file), &st))
3498  ereport(ERROR,
3500  errmsg("could not stat file \"%s\": %m",
3501  cstate->filename)));
3502 
3503  if (S_ISDIR(st.st_mode))
3504  ereport(ERROR,
3505  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3506  errmsg("\"%s\" is a directory", cstate->filename)));
3507  }
3508  }
3509 
3510  if (cstate->binary)
3511  {
3512  /* Read and verify binary header */
3513  char readSig[11];
3514  int32 tmp;
3515 
3516  /* Signature */
3517  if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3518  memcmp(readSig, BinarySignature, 11) != 0)
3519  ereport(ERROR,
3520  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3521  errmsg("COPY file signature not recognized")));
3522  /* Flags field */
3523  if (!CopyGetInt32(cstate, &tmp))
3524  ereport(ERROR,
3525  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3526  errmsg("invalid COPY file header (missing flags)")));
3527  if ((tmp & (1 << 16)) != 0)
3528  ereport(ERROR,
3529  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3530  errmsg("invalid COPY file header (WITH OIDS)")));
3531  tmp &= ~(1 << 16);
3532  if ((tmp >> 16) != 0)
3533  ereport(ERROR,
3534  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3535  errmsg("unrecognized critical flags in COPY file header")));
3536  /* Header extension length */
3537  if (!CopyGetInt32(cstate, &tmp) ||
3538  tmp < 0)
3539  ereport(ERROR,
3540  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3541  errmsg("invalid COPY file header (missing length)")));
3542  /* Skip extension header, if present */
3543  while (tmp-- > 0)
3544  {
3545  if (CopyGetData(cstate, readSig, 1, 1) != 1)
3546  ereport(ERROR,
3547  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3548  errmsg("invalid COPY file header (wrong length)")));
3549  }
3550  }
3551 
3552  /* create workspace for CopyReadAttributes results */
3553  if (!cstate->binary)
3554  {
3555  AttrNumber attr_count = list_length(cstate->attnumlist);
3556 
3557  cstate->max_fields = attr_count;
3558  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
3559  }
3560 
3561  MemoryContextSwitchTo(oldcontext);
3562 
3563  return cstate;
3564 }
3565 
3566 /*
3567  * Read raw fields in the next line for COPY FROM in text or csv mode.
3568  * Return false if no more lines.
3569  *
3570  * An internal temporary buffer is returned via 'fields'. It is valid until
3571  * the next call of the function. Since the function returns all raw fields
3572  * in the input file, 'nfields' could be different from the number of columns
3573  * in the relation.
3574  *
3575  * NOTE: force_not_null option are not applied to the returned fields.
3576  */
3577 bool
3578 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3579 {
3580  int fldct;
3581  bool done;
3582 
3583  /* only available for text or csv input */
3584  Assert(!cstate->binary);
3585 
3586  /* on input just throw the header line away */
3587  if (cstate->cur_lineno == 0 && cstate->header_line)
3588  {
3589  cstate->cur_lineno++;
3590  if (CopyReadLine(cstate))
3591  return false; /* done */
3592  }
3593 
3594  cstate->cur_lineno++;
3595 
3596  /* Actually read the line into memory here */
3597  done = CopyReadLine(cstate);
3598 
3599  /*
3600  * EOF at start of line means we're done. If we see EOF after some
3601  * characters, we act as though it was newline followed by EOF, ie,
3602  * process the line and then exit loop on next iteration.
3603  */
3604  if (done && cstate->line_buf.len == 0)
3605  return false;
3606 
3607  /* Parse the line into de-escaped field values */
3608  if (cstate->csv_mode)
3609  fldct = CopyReadAttributesCSV(cstate);
3610  else
3611  fldct = CopyReadAttributesText(cstate);
3612 
3613  *fields = cstate->raw_fields;
3614  *nfields = fldct;
3615  return true;
3616 }
3617 
3618 /*
3619  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3620  *
3621  * 'econtext' is used to evaluate default expression for each columns not
3622  * read from the file. It can be NULL when no default values are used, i.e.
3623  * when all columns are read from the file.
3624  *
3625  * 'values' and 'nulls' arrays must be the same length as columns of the
3626  * relation passed to BeginCopyFrom. This function fills the arrays.
3627  * Oid of the tuple is returned with 'tupleOid' separately.
3628  */
3629 bool
3631  Datum *values, bool *nulls)
3632 {
3633  TupleDesc tupDesc;
3634  AttrNumber num_phys_attrs,
3635  attr_count,
3636  num_defaults = cstate->num_defaults;
3637  FmgrInfo *in_functions = cstate->in_functions;
3638  Oid *typioparams = cstate->typioparams;
3639  int i;
3640  int *defmap = cstate->defmap;
3641  ExprState **defexprs = cstate->defexprs;
3642 
3643  tupDesc = RelationGetDescr(cstate->rel);
3644  num_phys_attrs = tupDesc->natts;
3645  attr_count = list_length(cstate->attnumlist);
3646 
3647  /* Initialize all values for row to NULL */
3648  MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3649  MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3650 
3651  if (!cstate->binary)
3652  {
3653  char **field_strings;
3654  ListCell *cur;
3655  int fldct;
3656  int fieldno;
3657  char *string;
3658 
3659  /* read raw fields in the next line */
3660  if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3661  return false;
3662 
3663  /* check for overflowing fields */
3664  if (attr_count > 0 && fldct > attr_count)
3665  ereport(ERROR,
3666  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3667  errmsg("extra data after last expected column")));
3668 
3669  fieldno = 0;
3670 
3671  /* Loop to read the user attributes on the line. */
3672  foreach(cur, cstate->attnumlist)
3673  {
3674  int attnum = lfirst_int(cur);
3675  int m = attnum - 1;
3676  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3677 
3678  if (fieldno >= fldct)
3679  ereport(ERROR,
3680  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3681  errmsg("missing data for column \"%s\"",
3682  NameStr(att->attname))));
3683  string = field_strings[fieldno++];
3684 
3685  if (cstate->convert_select_flags &&
3686  !cstate->convert_select_flags[m])
3687  {
3688  /* ignore input field, leaving column as NULL */
3689  continue;
3690  }
3691 
3692  if (cstate->csv_mode)
3693  {
3694  if (string == NULL &&
3695  cstate->force_notnull_flags[m])
3696  {
3697  /*
3698  * FORCE_NOT_NULL option is set and column is NULL -
3699  * convert it to the NULL string.
3700  */
3701  string = cstate->null_print;
3702  }
3703  else if (string != NULL && cstate->force_null_flags[m]
3704  && strcmp(string, cstate->null_print) == 0)
3705  {
3706  /*
3707  * FORCE_NULL option is set and column matches the NULL
3708  * string. It must have been quoted, or otherwise the
3709  * string would already have been set to NULL. Convert it
3710  * to NULL as specified.
3711  */
3712  string = NULL;
3713  }
3714  }
3715 
3716  cstate->cur_attname = NameStr(att->attname);
3717  cstate->cur_attval = string;
3718  values[m] = InputFunctionCall(&in_functions[m],
3719  string,
3720  typioparams[m],
3721  att->atttypmod);
3722  if (string != NULL)
3723  nulls[m] = false;
3724  cstate->cur_attname = NULL;
3725  cstate->cur_attval = NULL;
3726  }
3727 
3728  Assert(fieldno == attr_count);
3729  }
3730  else
3731  {
3732  /* binary */
3733  int16 fld_count;
3734  ListCell *cur;
3735 
3736  cstate->cur_lineno++;
3737 
3738  if (!CopyGetInt16(cstate, &fld_count))
3739  {
3740  /* EOF detected (end of file, or protocol-level EOF) */
3741  return false;
3742  }
3743 
3744  if (fld_count == -1)
3745  {
3746  /*
3747  * Received EOF marker. In a V3-protocol copy, wait for the
3748  * protocol-level EOF, and complain if it doesn't come
3749  * immediately. This ensures that we correctly handle CopyFail,
3750  * if client chooses to send that now.
3751  *
3752  * Note that we MUST NOT try to read more data in an old-protocol
3753  * copy, since there is no protocol-level EOF marker then. We
3754  * could go either way for copy from file, but choose to throw
3755  * error if there's data after the EOF marker, for consistency
3756  * with the new-protocol case.
3757  */
3758  char dummy;
3759 
3760  if (cstate->copy_dest != COPY_OLD_FE &&
3761  CopyGetData(cstate, &dummy, 1, 1) > 0)
3762  ereport(ERROR,
3763  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3764  errmsg("received copy data after EOF marker")));
3765  return false;
3766  }
3767 
3768  if (fld_count != attr_count)
3769  ereport(ERROR,
3770  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3771  errmsg("row field count is %d, expected %d",
3772  (int) fld_count, attr_count)));
3773 
3774  foreach(cur, cstate->attnumlist)
3775  {
3776  int attnum = lfirst_int(cur);
3777  int m = attnum - 1;
3778  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3779 
3780  cstate->cur_attname = NameStr(att->attname);
3781  values[m] = CopyReadBinaryAttribute(cstate,
3782  &in_functions[m],
3783  typioparams[m],
3784  att->atttypmod,
3785  &nulls[m]);
3786  cstate->cur_attname = NULL;
3787  }
3788  }
3789 
3790  /*
3791  * Now compute and insert any defaults available for the columns not
3792  * provided by the input data. Anything not processed here or above will
3793  * remain NULL.
3794  */
3795  for (i = 0; i < num_defaults; i++)
3796  {
3797  /*
3798  * The caller must supply econtext and have switched into the
3799  * per-tuple memory context in it.
3800  */
3801  Assert(econtext != NULL);
3803 
3804  values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3805  &nulls[defmap[i]]);
3806  }
3807 
3808  return true;
3809 }
3810 
3811 /*
3812  * Clean up storage and release resources for COPY FROM.
3813  */
3814 void
3816 {
3817  /* No COPY FROM related resources except memory. */
3818 
3819  EndCopy(cstate);
3820 }
3821 
3822 /*
3823  * Read the next input line and stash it in line_buf, with conversion to
3824  * server encoding.
3825  *
3826  * Result is true if read was terminated by EOF, false if terminated
3827  * by newline. The terminating newline or EOF marker is not included
3828  * in the final value of line_buf.
3829  */
3830 static bool
3832 {
3833  bool result;
3834 
3835  resetStringInfo(&cstate->line_buf);
3836  cstate->line_buf_valid = true;
3837 
3838  /* Mark that encoding conversion hasn't occurred yet */
3839  cstate->line_buf_converted = false;
3840 
3841  /* Parse data and transfer into line_buf */
3842  result = CopyReadLineText(cstate);
3843 
3844  if (result)
3845  {
3846  /*
3847  * Reached EOF. In protocol version 3, we should ignore anything
3848  * after \. up to the protocol end of copy data. (XXX maybe better
3849  * not to treat \. as special?)
3850  */
3851  if (cstate->copy_dest == COPY_NEW_FE)
3852  {
3853  do
3854  {
3855  cstate->raw_buf_index = cstate->raw_buf_len;
3856  } while (CopyLoadRawBuf(cstate));
3857  }
3858  }
3859  else
3860  {
3861  /*
3862  * If we didn't hit EOF, then we must have transferred the EOL marker
3863  * to line_buf along with the data. Get rid of it.
3864  */
3865  switch (cstate->eol_type)
3866  {
3867  case EOL_NL:
3868  Assert(cstate->line_buf.len >= 1);
3869  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3870  cstate->line_buf.len--;
3871  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3872  break;
3873  case EOL_CR:
3874  Assert(cstate->line_buf.len >= 1);
3875  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3876  cstate->line_buf.len--;
3877  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3878  break;
3879  case EOL_CRNL:
3880  Assert(cstate->line_buf.len >= 2);
3881  Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3882  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3883  cstate->line_buf.len -= 2;
3884  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3885  break;
3886  case EOL_UNKNOWN:
3887  /* shouldn't get here */
3888  Assert(false);
3889  break;
3890  }
3891  }
3892 
3893  /* Done reading the line. Convert it to server encoding. */
3894  if (cstate->need_transcoding)
3895  {
3896  char *cvt;
3897 
3898  cvt = pg_any_to_server(cstate->line_buf.data,
3899  cstate->line_buf.len,
3900  cstate->file_encoding);
3901  if (cvt != cstate->line_buf.data)
3902  {
3903  /* transfer converted data back to line_buf */
3904  resetStringInfo(&cstate->line_buf);
3905  appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3906  pfree(cvt);
3907  }
3908  }
3909 
3910  /* Now it's safe to use the buffer in error messages */
3911  cstate->line_buf_converted = true;
3912 
3913  return result;
3914 }
3915 
3916 /*
3917  * CopyReadLineText - inner loop of CopyReadLine for text mode
3918  */
3919 static bool
3921 {
3922  char *copy_raw_buf;
3923  int raw_buf_ptr;
3924  int copy_buf_len;
3925  bool need_data = false;
3926  bool hit_eof = false;
3927  bool result = false;
3928  char mblen_str[2];
3929 
3930  /* CSV variables */
3931  bool first_char_in_line = true;
3932  bool in_quote = false,
3933  last_was_esc = false;
3934  char quotec = '\0';
3935  char escapec = '\0';
3936 
3937  if (cstate->csv_mode)
3938  {
3939  quotec = cstate->quote[0];
3940  escapec = cstate->escape[0];
3941  /* ignore special escape processing if it's the same as quotec */
3942  if (quotec == escapec)
3943  escapec = '\0';
3944  }
3945 
3946  mblen_str[1] = '\0';
3947 
3948  /*
3949  * The objective of this loop is to transfer the entire next input line
3950  * into line_buf. Hence, we only care for detecting newlines (\r and/or
3951  * \n) and the end-of-copy marker (\.).
3952  *
3953  * In CSV mode, \r and \n inside a quoted field are just part of the data
3954  * value and are put in line_buf. We keep just enough state to know if we
3955  * are currently in a quoted field or not.
3956  *
3957  * These four characters, and the CSV escape and quote characters, are
3958  * assumed the same in frontend and backend encodings.
3959  *
3960  * For speed, we try to move data from raw_buf to line_buf in chunks
3961  * rather than one character at a time. raw_buf_ptr points to the next
3962  * character to examine; any characters from raw_buf_index to raw_buf_ptr
3963  * have been determined to be part of the line, but not yet transferred to
3964  * line_buf.
3965  *
3966  * For a little extra speed within the loop, we copy raw_buf and
3967  * raw_buf_len into local variables.
3968  */
3969  copy_raw_buf = cstate->raw_buf;
3970  raw_buf_ptr = cstate->raw_buf_index;
3971  copy_buf_len = cstate->raw_buf_len;
3972 
3973  for (;;)
3974  {
3975  int prev_raw_ptr;
3976  char c;
3977 
3978  /*
3979  * Load more data if needed. Ideally we would just force four bytes
3980  * of read-ahead and avoid the many calls to
3981  * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3982  * does not allow us to read too far ahead or we might read into the
3983  * next data, so we read-ahead only as far we know we can. One
3984  * optimization would be to read-ahead four byte here if
3985  * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3986  * considering the size of the buffer.
3987  */
3988  if (raw_buf_ptr >= copy_buf_len || need_data)
3989  {
3991 
3992  /*
3993  * Try to read some more data. This will certainly reset
3994  * raw_buf_index to zero, and raw_buf_ptr must go with it.
3995  */
3996  if (!CopyLoadRawBuf(cstate))
3997  hit_eof = true;
3998  raw_buf_ptr = 0;
3999  copy_buf_len = cstate->raw_buf_len;
4000 
4001  /*
4002  * If we are completely out of data, break out of the loop,
4003  * reporting EOF.
4004  */
4005  if (copy_buf_len <= 0)
4006  {
4007  result = true;
4008  break;
4009  }
4010  need_data = false;
4011  }
4012 
4013  /* OK to fetch a character */
4014  prev_raw_ptr = raw_buf_ptr;
4015  c = copy_raw_buf[raw_buf_ptr++];
4016 
4017  if (cstate->csv_mode)
4018  {
4019  /*
4020  * If character is '\\' or '\r', we may need to look ahead below.
4021  * Force fetch of the next character if we don't already have it.
4022  * We need to do this before changing CSV state, in case one of
4023  * these characters is also the quote or escape character.
4024  *
4025  * Note: old-protocol does not like forced prefetch, but it's OK
4026  * here since we cannot validly be at EOF.
4027  */
4028  if (c == '\\' || c == '\r')
4029  {
4031  }
4032 
4033  /*
4034  * Dealing with quotes and escapes here is mildly tricky. If the
4035  * quote char is also the escape char, there's no problem - we
4036  * just use the char as a toggle. If they are different, we need
4037  * to ensure that we only take account of an escape inside a
4038  * quoted field and immediately preceding a quote char, and not
4039  * the second in an escape-escape sequence.
4040  */
4041  if (in_quote && c == escapec)
4042  last_was_esc = !last_was_esc;
4043  if (c == quotec && !last_was_esc)
4044  in_quote = !in_quote;
4045  if (c != escapec)
4046  last_was_esc = false;
4047 
4048  /*
4049  * Updating the line count for embedded CR and/or LF chars is
4050  * necessarily a little fragile - this test is probably about the
4051  * best we can do. (XXX it's arguable whether we should do this
4052  * at all --- is cur_lineno a physical or logical count?)
4053  */
4054  if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
4055  cstate->cur_lineno++;
4056  }
4057 
4058  /* Process \r */
4059  if (c == '\r' && (!cstate->csv_mode || !in_quote))
4060  {
4061  /* Check for \r\n on first line, _and_ handle \r\n. */
4062  if (cstate->eol_type == EOL_UNKNOWN ||
4063  cstate->eol_type == EOL_CRNL)
4064  {
4065  /*
4066  * If need more data, go back to loop top to load it.
4067  *
4068  * Note that if we are at EOF, c will wind up as '\0' because
4069  * of the guaranteed pad of raw_buf.
4070  */
4072 
4073  /* get next char */
4074  c = copy_raw_buf[raw_buf_ptr];
4075 
4076  if (c == '\n')
4077  {
4078  raw_buf_ptr++; /* eat newline */
4079  cstate->eol_type = EOL_CRNL; /* in case not set yet */
4080  }
4081  else
4082  {
4083  /* found \r, but no \n */
4084  if (cstate->eol_type == EOL_CRNL)
4085  ereport(ERROR,
4086  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4087  !cstate->csv_mode ?
4088  errmsg("literal carriage return found in data") :
4089  errmsg("unquoted carriage return found in data"),
4090  !cstate->csv_mode ?
4091  errhint("Use \"\\r\" to represent carriage return.") :
4092  errhint("Use quoted CSV field to represent carriage return.")));
4093 
4094  /*
4095  * if we got here, it is the first line and we didn't find
4096  * \n, so don't consume the peeked character
4097  */
4098  cstate->eol_type = EOL_CR;
4099  }
4100  }
4101  else if (cstate->eol_type == EOL_NL)
4102  ereport(ERROR,
4103  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4104  !cstate->csv_mode ?
4105  errmsg("literal carriage return found in data") :
4106  errmsg("unquoted carriage return found in data"),
4107  !cstate->csv_mode ?
4108  errhint("Use \"\\r\" to represent carriage return.") :
4109  errhint("Use quoted CSV field to represent carriage return.")));
4110  /* If reach here, we have found the line terminator */
4111  break;
4112  }
4113 
4114  /* Process \n */
4115  if (c == '\n' && (!cstate->csv_mode || !in_quote))
4116  {
4117  if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
4118  ereport(ERROR,
4119  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4120  !cstate->csv_mode ?
4121  errmsg("literal newline found in data") :
4122  errmsg("unquoted newline found in data"),
4123  !cstate->csv_mode ?
4124  errhint("Use \"\\n\" to represent newline.") :
4125  errhint("Use quoted CSV field to represent newline.")));
4126  cstate->eol_type = EOL_NL; /* in case not set yet */
4127  /* If reach here, we have found the line terminator */
4128  break;
4129  }
4130 
4131  /*
4132  * In CSV mode, we only recognize \. alone on a line. This is because
4133  * \. is a valid CSV data value.
4134  */
4135  if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
4136  {
4137  char c2;
4138 
4141 
4142  /* -----
4143  * get next character
4144  * Note: we do not change c so if it isn't \., we can fall
4145  * through and continue processing for file encoding.
4146  * -----
4147  */
4148  c2 = copy_raw_buf[raw_buf_ptr];
4149 
4150  if (c2 == '.')
4151  {
4152  raw_buf_ptr++; /* consume the '.' */
4153 
4154  /*
4155  * Note: if we loop back for more data here, it does not
4156  * matter that the CSV state change checks are re-executed; we
4157  * will come back here with no important state changed.
4158  */
4159  if (cstate->eol_type == EOL_CRNL)
4160  {
4161  /* Get the next character */
4163  /* if hit_eof, c2 will become '\0' */
4164  c2 = copy_raw_buf[raw_buf_ptr++];
4165 
4166  if (c2 == '\n')
4167  {
4168  if (!cstate->csv_mode)
4169  ereport(ERROR,
4170  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4171  errmsg("end-of-copy marker does not match previous newline style")));
4172  else
4174  }
4175  else if (c2 != '\r')
4176  {
4177  if (!cstate->csv_mode)
4178  ereport(ERROR,
4179  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4180  errmsg("end-of-copy marker corrupt")));
4181  else
4183  }
4184  }
4185 
4186  /* Get the next character */
4188  /* if hit_eof, c2 will become '\0' */
4189  c2 = copy_raw_buf[raw_buf_ptr++];
4190 
4191  if (c2 != '\r' && c2 != '\n')
4192  {
4193  if (!cstate->csv_mode)
4194  ereport(ERROR,
4195  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4196  errmsg("end-of-copy marker corrupt")));
4197  else
4199  }
4200 
4201  if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
4202  (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
4203  (cstate->eol_type == EOL_CR && c2 != '\r'))
4204  {
4205  ereport(ERROR,
4206  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4207  errmsg("end-of-copy marker does not match previous newline style")));
4208  }
4209 
4210  /*
4211  * Transfer only the data before the \. into line_buf, then
4212  * discard the data and the \. sequence.
4213  */
4214  if (prev_raw_ptr > cstate->raw_buf_index)
4216  cstate->raw_buf + cstate->raw_buf_index,
4217  prev_raw_ptr - cstate->raw_buf_index);
4218  cstate->raw_buf_index = raw_buf_ptr;
4219  result = true; /* report EOF */
4220  break;
4221  }
4222  else if (!cstate->csv_mode)
4223 
4224  /*
4225  * If we are here, it means we found a backslash followed by
4226  * something other than a period. In non-CSV mode, anything
4227  * after a backslash is special, so we skip over that second
4228  * character too. If we didn't do that \\. would be
4229  * considered an eof-of copy, while in non-CSV mode it is a
4230  * literal backslash followed by a period. In CSV mode,
4231  * backslashes are not special, so we want to process the
4232  * character after the backslash just like a normal character,
4233  * so we don't increment in those cases.
4234  */
4235  raw_buf_ptr++;
4236  }
4237 
4238  /*
4239  * This label is for CSV cases where \. appears at the start of a
4240  * line, but there is more text after it, meaning it was a data value.
4241  * We are more strict for \. in CSV mode because \. could be a data
4242  * value, while in non-CSV mode, \. cannot be a data value.
4243  */
4244 not_end_of_copy:
4245 
4246  /*
4247  * Process all bytes of a multi-byte character as a group.
4248  *
4249  * We only support multi-byte sequences where the first byte has the
4250  * high-bit set, so as an optimization we can avoid this block
4251  * entirely if it is not set.
4252  */
4253  if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4254  {
4255  int mblen;
4256 
4257  /*
4258  * It is enough to look at the first byte in all our encodings, to
4259  * get the length. (GB18030 is a bit special, but still works for
4260  * our purposes; see comment in pg_gb18030_mblen())
4261  */
4262  mblen_str[0] = c;
4263  mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4264 
4266  IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4267  raw_buf_ptr += mblen - 1;
4268  }
4269  first_char_in_line = false;
4270  } /* end of outer loop */
4271 
4272  /*
4273  * Transfer any still-uncopied data to line_buf.
4274  */
4276 
4277  return result;
4278 }
4279 
4280 /*
4281  * Return decimal value for a hexadecimal digit
4282  */
4283 static int
4285 {
4286  if (isdigit((unsigned char) hex))
4287  return hex - '0';
4288  else
4289  return tolower((unsigned char) hex) - 'a' + 10;
4290 }
4291 
4292 /*
4293  * Parse the current line into separate attributes (fields),
4294  * performing de-escaping as needed.
4295  *
4296  * The input is in line_buf. We use attribute_buf to hold the result
4297  * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4298  * string, or NULL when the input matches the null marker string.
4299  * This array is expanded as necessary.
4300  *
4301  * (Note that the caller cannot check for nulls since the returned
4302  * string would be the post-de-escaping equivalent, which may look
4303  * the same as some valid data string.)
4304  *
4305  * delim is the column delimiter string (must be just one byte for now).
4306  * null_print is the null marker string. Note that this is compared to
4307  * the pre-de-escaped input string.
4308  *
4309  * The return value is the number of fields actually read.
4310  */
4311 static int
4313 {
4314  char delimc = cstate->delim[0];
4315  int fieldno;
4316  char *output_ptr;
4317  char *cur_ptr;
4318  char *line_end_ptr;
4319 
4320  /*
4321  * We need a special case for zero-column tables: check that the input
4322  * line is empty, and return.
4323  */
4324  if (cstate->max_fields <= 0)
4325  {
4326  if (cstate->line_buf.len != 0)
4327  ereport(ERROR,
4328  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4329  errmsg("extra data after last expected column")));
4330  return 0;
4331  }
4332 
4333  resetStringInfo(&cstate->attribute_buf);
4334 
4335  /*
4336  * The de-escaped attributes will certainly not be longer than the input
4337  * data line, so we can just force attribute_buf to be large enough and
4338  * then transfer data without any checks for enough space. We need to do
4339  * it this way because enlarging attribute_buf mid-stream would invalidate
4340  * pointers already stored into cstate->raw_fields[].
4341  */
4342  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4343  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4344  output_ptr = cstate->attribute_buf.data;
4345 
4346  /* set pointer variables for loop */
4347  cur_ptr = cstate->line_buf.data;
4348  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4349 
4350  /* Outer loop iterates over fields */
4351  fieldno = 0;
4352  for (;;)
4353  {
4354  bool found_delim = false;
4355  char *start_ptr;
4356  char *end_ptr;
4357  int input_len;
4358  bool saw_non_ascii = false;
4359 
4360  /* Make sure there is enough space for the next value */
4361  if (fieldno >= cstate->max_fields)
4362  {
4363  cstate->max_fields *= 2;
4364  cstate->raw_fields =
4365  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4366  }
4367 
4368  /* Remember start of field on both input and output sides */
4369  start_ptr = cur_ptr;
4370  cstate->raw_fields[fieldno] = output_ptr;
4371 
4372  /*
4373  * Scan data for field.
4374  *
4375  * Note that in this loop, we are scanning to locate the end of field
4376  * and also speculatively performing de-escaping. Once we find the
4377  * end-of-field, we can match the raw field contents against the null
4378  * marker string. Only after that comparison fails do we know that
4379  * de-escaping is actually the right thing to do; therefore we *must
4380  * not* throw any syntax errors before we've done the null-marker
4381  * check.
4382  */
4383  for (;;)
4384  {
4385  char c;
4386 
4387  end_ptr = cur_ptr;
4388  if (cur_ptr >= line_end_ptr)
4389  break;
4390  c = *cur_ptr++;
4391  if (c == delimc)
4392  {
4393  found_delim = true;
4394  break;
4395  }
4396  if (c == '\\')
4397  {
4398  if (cur_ptr >= line_end_ptr)
4399  break;
4400  c = *cur_ptr++;
4401  switch (c)
4402  {
4403  case '0':
4404  case '1':
4405  case '2':
4406  case '3':
4407  case '4':
4408  case '5':
4409  case '6':
4410  case '7':
4411  {
4412  /* handle \013 */
4413  int val;
4414 
4415  val = OCTVALUE(c);
4416  if (cur_ptr < line_end_ptr)
4417  {
4418  c = *cur_ptr;
4419  if (ISOCTAL(c))
4420  {
4421  cur_ptr++;
4422  val = (val << 3) + OCTVALUE(c);
4423  if (cur_ptr < line_end_ptr)
4424  {
4425  c = *cur_ptr;
4426  if (ISOCTAL(c))
4427  {
4428  cur_ptr++;
4429  val = (val << 3) + OCTVALUE(c);
4430  }
4431  }
4432  }
4433  }
4434  c = val & 0377;
4435  if (c == '\0' || IS_HIGHBIT_SET(c))
4436  saw_non_ascii = true;
4437  }
4438  break;
4439  case 'x':
4440  /* Handle \x3F */
4441  if (cur_ptr < line_end_ptr)
4442  {
4443  char hexchar = *cur_ptr;
4444 
4445  if (isxdigit((unsigned char) hexchar))
4446  {
4447  int val = GetDecimalFromHex(hexchar);
4448 
4449  cur_ptr++;
4450  if (cur_ptr < line_end_ptr)
4451  {
4452  hexchar = *cur_ptr;
4453  if (isxdigit((unsigned char) hexchar))
4454  {
4455  cur_ptr++;
4456  val = (val << 4) + GetDecimalFromHex(hexchar);
4457  }
4458  }
4459  c = val & 0xff;
4460  if (c == '\0' || IS_HIGHBIT_SET(c))
4461  saw_non_ascii = true;
4462  }
4463  }
4464  break;
4465  case 'b':
4466  c = '\b';
4467  break;
4468  case 'f':
4469  c = '\f';
4470  break;
4471  case 'n':
4472  c = '\n';
4473  break;
4474  case 'r':
4475  c = '\r';
4476  break;
4477  case 't':
4478  c = '\t';
4479  break;
4480  case 'v':
4481  c = '\v';
4482  break;
4483 
4484  /*
4485  * in all other cases, take the char after '\'
4486  * literally
4487  */
4488  }
4489  }
4490 
4491  /* Add c to output string */
4492  *output_ptr++ = c;
4493  }
4494 
4495  /* Check whether raw input matched null marker */
4496  input_len = end_ptr - start_ptr;
4497  if (input_len == cstate->null_print_len &&
4498  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4499  cstate->raw_fields[fieldno] = NULL;
4500  else
4501  {
4502  /*
4503  * At this point we know the field is supposed to contain data.
4504  *
4505  * If we de-escaped any non-7-bit-ASCII chars, make sure the
4506  * resulting string is valid data for the db encoding.
4507  */
4508  if (saw_non_ascii)
4509  {
4510  char *fld = cstate->raw_fields[fieldno];
4511 
4512  pg_verifymbstr(fld, output_ptr - fld, false);
4513  }
4514  }
4515 
4516  /* Terminate attribute value in output area */
4517  *output_ptr++ = '\0';
4518 
4519  fieldno++;
4520  /* Done if we hit EOL instead of a delim */
4521  if (!found_delim)
4522  break;
4523  }
4524 
4525  /* Clean up state of attribute_buf */
4526  output_ptr--;
4527  Assert(*output_ptr == '\0');
4528  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4529 
4530  return fieldno;
4531 }
4532 
4533 /*
4534  * Parse the current line into separate attributes (fields),
4535  * performing de-escaping as needed. This has exactly the same API as
4536  * CopyReadAttributesText, except we parse the fields according to
4537  * "standard" (i.e. common) CSV usage.
4538  */
4539 static int
4541 {
4542  char delimc = cstate->delim[0];
4543  char quotec = cstate->quote[0];
4544  char escapec = cstate->escape[0];
4545  int fieldno;
4546  char *output_ptr;
4547  char *cur_ptr;
4548  char *line_end_ptr;
4549 
4550  /*
4551  * We need a special case for zero-column tables: check that the input
4552  * line is empty, and return.
4553  */
4554  if (cstate->max_fields <= 0)
4555  {
4556  if (cstate->line_buf.len != 0)
4557  ereport(ERROR,
4558  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4559  errmsg("extra data after last expected column")));
4560  return 0;
4561  }
4562 
4563  resetStringInfo(&cstate->attribute_buf);
4564 
4565  /*
4566  * The de-escaped attributes will certainly not be longer than the input
4567  * data line, so we can just force attribute_buf to be large enough and
4568  * then transfer data without any checks for enough space. We need to do
4569  * it this way because enlarging attribute_buf mid-stream would invalidate
4570  * pointers already stored into cstate->raw_fields[].
4571  */
4572  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4573  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4574  output_ptr = cstate->attribute_buf.data;
4575 
4576  /* set pointer variables for loop */
4577  cur_ptr = cstate->line_buf.data;
4578  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4579 
4580  /* Outer loop iterates over fields */
4581  fieldno = 0;
4582  for (;;)
4583  {
4584  bool found_delim = false;
4585  bool saw_quote = false;
4586  char *start_ptr;
4587  char *end_ptr;
4588  int input_len;
4589 
4590  /* Make sure there is enough space for the next value */
4591  if (fieldno >= cstate->max_fields)
4592  {
4593  cstate->max_fields *= 2;
4594  cstate->raw_fields =
4595  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4596  }
4597 
4598  /* Remember start of field on both input and output sides */
4599  start_ptr = cur_ptr;
4600  cstate->raw_fields[fieldno] = output_ptr;
4601 
4602  /*
4603  * Scan data for field,
4604  *
4605  * The loop starts in "not quote" mode and then toggles between that
4606  * and "in quote" mode. The loop exits normally if it is in "not
4607  * quote" mode and a delimiter or line end is seen.
4608  */
4609  for (;;)
4610  {
4611  char c;
4612 
4613  /* Not in quote */
4614  for (;;)
4615  {
4616  end_ptr = cur_ptr;
4617  if (cur_ptr >= line_end_ptr)
4618  goto endfield;
4619  c = *cur_ptr++;
4620  /* unquoted field delimiter */
4621  if (c == delimc)
4622  {
4623  found_delim = true;
4624  goto endfield;
4625  }
4626  /* start of quoted field (or part of field) */
4627  if (c == quotec)
4628  {
4629  saw_quote = true;
4630  break;
4631  }
4632  /* Add c to output string */
4633  *output_ptr++ = c;
4634  }
4635 
4636  /* In quote */
4637  for (;;)
4638  {
4639  end_ptr = cur_ptr;
4640  if (cur_ptr >= line_end_ptr)
4641  ereport(ERROR,
4642  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4643  errmsg("unterminated CSV quoted field")));
4644 
4645  c = *cur_ptr++;
4646 
4647  /* escape within a quoted field */
4648  if (c == escapec)
4649  {
4650  /*
4651  * peek at the next char if available, and escape it if it
4652  * is an escape char or a quote char
4653  */
4654  if (cur_ptr < line_end_ptr)
4655  {
4656  char nextc = *cur_ptr;
4657 
4658  if (nextc == escapec || nextc == quotec)
4659  {
4660  *output_ptr++ = nextc;
4661  cur_ptr++;
4662  continue;
4663  }
4664  }
4665  }
4666 
4667  /*
4668  * end of quoted field. Must do this test after testing for
4669  * escape in case quote char and escape char are the same
4670  * (which is the common case).
4671  */
4672  if (c == quotec)
4673  break;
4674 
4675  /* Add c to output string */
4676  *output_ptr++ = c;
4677  }
4678  }
4679 endfield:
4680 
4681  /* Terminate attribute value in output area */
4682  *output_ptr++ = '\0';
4683 
4684  /* Check whether raw input matched null marker */
4685  input_len = end_ptr - start_ptr;
4686  if (!saw_quote && input_len == cstate->null_print_len &&
4687  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4688  cstate->raw_fields[fieldno] = NULL;
4689 
4690  fieldno++;
4691  /* Done if we hit EOL instead of a delim */
4692  if (!found_delim)
4693  break;
4694  }
4695 
4696  /* Clean up state of attribute_buf */
4697  output_ptr--;
4698  Assert(*output_ptr == '\0');
4699  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4700 
4701  return fieldno;
4702 }
4703 
4704 
4705 /*
4706  * Read a binary attribute
4707  */
4708 static Datum
4710  Oid typioparam, int32 typmod,
4711  bool *isnull)
4712 {
4713  int32 fld_size;
4714  Datum result;
4715 
4716  if (!CopyGetInt32(cstate, &fld_size))
4717  ereport(ERROR,
4718  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4719  errmsg("unexpected EOF in COPY data")));
4720  if (fld_size == -1)
4721  {
4722  *isnull = true;
4723  return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4724  }
4725  if (fld_size < 0)
4726  ereport(ERROR,
4727  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4728  errmsg("invalid field size")));
4729 
4730  /* reset attribute_buf to empty, and load raw data in it */
4731  resetStringInfo(&cstate->attribute_buf);
4732 
4733  enlargeStringInfo(&cstate->attribute_buf, fld_size);
4734  if (CopyGetData(cstate, cstate->attribute_buf.data,
4735  fld_size, fld_size) != fld_size)
4736  ereport(ERROR,
4737  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4738  errmsg("unexpected EOF in COPY data")));
4739 
4740  cstate->attribute_buf.len = fld_size;
4741  cstate->attribute_buf.data[fld_size] = '\0';
4742 
4743  /* Call the column type's binary input converter */
4744  result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4745  typioparam, typmod);
4746 
4747  /* Trouble if it didn't eat the whole buffer */
4748  if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4749  ereport(ERROR,
4750  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4751  errmsg("incorrect binary data format")));
4752 
4753  *isnull = false;
4754  return result;
4755 }
4756 
4757 /*
4758  * Send text representation of one attribute, with conversion and escaping
4759  */
4760 #define DUMPSOFAR() \
4761  do { \
4762  if (ptr > start) \
4763  CopySendData(cstate, start, ptr - start); \
4764  } while (0)
4765 
4766 static void
4767 CopyAttributeOutText(CopyState cstate, char *string)
4768 {
4769  char *ptr;
4770  char *start;
4771  char c;
4772  char delimc = cstate->delim[0];
4773 
4774  if (cstate->need_transcoding)
4775  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4776  else
4777  ptr = string;
4778 
4779  /*
4780  * We have to grovel through the string searching for control characters
4781  * and instances of the delimiter character. In most cases, though, these
4782  * are infrequent. To avoid overhead from calling CopySendData once per
4783  * character, we dump out all characters between escaped characters in a
4784  * single call. The loop invariant is that the data from "start" to "ptr"
4785  * can be sent literally, but hasn't yet been.
4786  *
4787  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4788  * in valid backend encodings, extra bytes of a multibyte character never
4789  * look like ASCII. This loop is sufficiently performance-critical that
4790  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4791  * of the normal safe-encoding path.
4792  */
4793  if (cstate->encoding_embeds_ascii)
4794  {
4795  start = ptr;
4796  while ((c = *ptr) != '\0')
4797  {
4798  if ((unsigned char) c < (unsigned char) 0x20)
4799  {
4800  /*
4801  * \r and \n must be escaped, the others are traditional. We
4802  * prefer to dump these using the C-like notation, rather than
4803  * a backslash and the literal character, because it makes the
4804  * dump file a bit more proof against Microsoftish data
4805  * mangling.
4806  */
4807  switch (c)
4808  {
4809  case '\b':
4810  c = 'b';
4811  break;
4812  case '\f':
4813  c = 'f';
4814  break;
4815  case '\n':
4816  c = 'n';
4817  break;
4818  case '\r':
4819  c = 'r';
4820  break;
4821  case '\t':
4822  c = 't';
4823  break;
4824  case '\v':
4825  c = 'v';
4826  break;
4827  default:
4828  /* If it's the delimiter, must backslash it */
4829  if (c == delimc)
4830  break;
4831  /* All ASCII control chars are length 1 */
4832  ptr++;
4833  continue; /* fall to end of loop */
4834  }
4835  /* if we get here, we need to convert the control char */
4836  DUMPSOFAR();
4837  CopySendChar(cstate, '\\');
4838  CopySendChar(cstate, c);
4839  start = ++ptr; /* do not include char in next run */
4840  }
4841  else if (c == '\\' || c == delimc)
4842  {
4843  DUMPSOFAR();
4844  CopySendChar(cstate, '\\');
4845  start = ptr++; /* we include char in next run */
4846  }
4847  else if (IS_HIGHBIT_SET(c))
4848  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4849  else
4850  ptr++;
4851  }
4852  }
4853  else
4854  {
4855  start = ptr;
4856  while ((c = *ptr) != '\0')
4857  {
4858  if ((unsigned char) c < (unsigned char) 0x20)
4859  {
4860  /*
4861  * \r and \n must be escaped, the others are traditional. We
4862  * prefer to dump these using the C-like notation, rather than
4863  * a backslash and the literal character, because it makes the
4864  * dump file a bit more proof against Microsoftish data
4865  * mangling.
4866  */
4867  switch (c)
4868  {
4869  case '\b':
4870  c = 'b';
4871  break;
4872  case '\f':
4873  c = 'f';
4874  break;
4875  case '\n':
4876  c = 'n';
4877  break;
4878  case '\r':
4879  c = 'r';
4880  break;
4881  case '\t':
4882  c = 't';
4883  break;
4884  case '\v':
4885  c = 'v';
4886  break;
4887  default:
4888  /* If it's the delimiter, must backslash it */
4889  if (c == delimc)
4890  break;
4891  /* All ASCII control chars are length 1 */
4892  ptr++;
4893  continue; /* fall to end of loop */
4894  }
4895  /* if we get here, we need to convert the control char */
4896  DUMPSOFAR();
4897  CopySendChar(cstate, '\\');
4898  CopySendChar(cstate, c);
4899  start = ++ptr; /* do not include char in next run */
4900  }
4901  else if (c == '\\' || c == delimc)
4902  {
4903  DUMPSOFAR();
4904  CopySendChar(cstate, '\\');
4905  start = ptr++; /* we include char in next run */
4906  }
4907  else
4908  ptr++;
4909  }
4910  }
4911 
4912  DUMPSOFAR();
4913 }
4914 
4915 /*
4916  * Send text representation of one attribute, with conversion and
4917  * CSV-style escaping
4918  */
4919 static void
4920 CopyAttributeOutCSV(CopyState cstate, char *string,
4921  bool use_quote, bool single_attr)
4922 {
4923  char *ptr;
4924  char *start;
4925  char c;
4926  char delimc = cstate->delim[0];
4927  char quotec = cstate->quote[0];
4928  char escapec = cstate->escape[0];
4929 
4930  /* force quoting if it matches null_print (before conversion!) */
4931  if (!use_quote && strcmp(string, cstate->null_print) == 0)
4932  use_quote = true;
4933 
4934  if (cstate->need_transcoding)
4935  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4936  else
4937  ptr = string;
4938 
4939  /*
4940  * Make a preliminary pass to discover if it needs quoting
4941  */
4942  if (!use_quote)
4943  {
4944  /*
4945  * Because '\.' can be a data value, quote it if it appears alone on a
4946  * line so it is not interpreted as the end-of-data marker.
4947  */
4948  if (single_attr && strcmp(ptr, "\\.") == 0)
4949  use_quote = true;
4950  else
4951  {
4952  char *tptr = ptr;
4953 
4954  while ((c = *tptr) != '\0')
4955  {
4956  if (c == delimc || c == quotec || c == '\n' || c == '\r')
4957  {
4958  use_quote = true;
4959  break;
4960  }
4961  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4962  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4963  else
4964  tptr++;
4965  }
4966  }
4967  }
4968 
4969  if (use_quote)
4970  {
4971  CopySendChar(cstate, quotec);
4972 
4973  /*
4974  * We adopt the same optimization strategy as in CopyAttributeOutText
4975  */
4976  start = ptr;
4977  while ((c = *ptr) != '\0')
4978  {
4979  if (c == quotec || c == escapec)
4980  {
4981  DUMPSOFAR();
4982  CopySendChar(cstate, escapec);
4983  start = ptr; /* we include char in next run */
4984  }
4985  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4986  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4987  else
4988  ptr++;
4989  }
4990  DUMPSOFAR();
4991 
4992  CopySendChar(cstate, quotec);
4993  }
4994  else
4995  {
4996  /* If it doesn't need quoting, we can just dump it as-is */
4997  CopySendString(cstate, ptr);
4998  }
4999 }
5000 
5001 /*
5002  * CopyGetAttnums - build an integer list of attnums to be copied
5003  *
5004  * The input attnamelist is either the user-specified column list,
5005  * or NIL if there was none (in which case we want all the non-dropped
5006  * columns).
5007  *
5008  * We don't include generated columns in the generated full list and we don't
5009  * allow them to be specified explicitly. They don't make sense for COPY
5010  * FROM, but we could possibly allow them for COPY TO. But this way it's at
5011  * least ensured that whatever we copy out can be copied back in.
5012  *
5013  * rel can be NULL ... it's only used for error reports.
5014  */
5015 static List *
5016 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
5017 {
5018  List *attnums = NIL;
5019 
5020  if (attnamelist == NIL)
5021  {
5022  /* Generate default column list */
5023  int attr_count = tupDesc->natts;
5024  int i;
5025 
5026  for (i = 0; i < attr_count; i++)
5027  {
5028  if (TupleDescAttr(tupDesc, i)->attisdropped)
5029  continue;
5030  if (TupleDescAttr(tupDesc, i)->attgenerated)
5031  continue;
5032  attnums = lappend_int(attnums, i + 1);
5033  }
5034  }
5035  else
5036  {
5037  /* Validate the user-supplied list and extract attnums */
5038  ListCell *l;
5039 
5040  foreach(l, attnamelist)
5041  {
5042  char *name = strVal(lfirst(l));
5043  int attnum;
5044  int i;
5045 
5046  /* Lookup column name */
5047  attnum = InvalidAttrNumber;
5048  for (i = 0; i < tupDesc->natts; i++)
5049  {
5050  Form_pg_attribute att = TupleDescAttr(tupDesc, i);
5051 
5052  if (att->attisdropped)
5053  continue;
5054  if (namestrcmp(&(att->attname), name) == 0)
5055  {
5056  if (att->attgenerated)
5057  ereport(ERROR,
5058  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
5059  errmsg("column \"%s\" is a generated column",
5060  name),
5061  errdetail("Generated columns cannot be used in COPY.")));
5062  attnum = att->attnum;
5063  break;
5064  }
5065  }
5066  if (attnum == InvalidAttrNumber)
5067  {
5068  if (rel != NULL)
5069  ereport(ERROR,
5070  (errcode(ERRCODE_UNDEFINED_COLUMN),
5071  errmsg("column \"%s\" of relation \"%s\" does not exist",
5072  name, RelationGetRelationName(rel))));
5073  else
5074  ereport(ERROR,
5075  (errcode(ERRCODE_UNDEFINED_COLUMN),
5076  errmsg("column \"%s\" does not exist",
5077  name)));
5078  }
5079  /* Check for duplicates */
5080  if (list_member_int(attnums, attnum))
5081  ereport(ERROR,
5082  (errcode(ERRCODE_DUPLICATE_COLUMN),
5083  errmsg("column \"%s\" specified more than once",
5084  name)));
5085  attnums = lappend_int(attnums, attnum);
5086  }
5087  }
5088 
5089  return attnums;
5090 }
5091 
5092 
5093 /*
5094  * copy_dest_startup --- executor startup
5095  */
5096 static void
5097 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
5098 {
5099  /* no-op */
5100 }
5101 
5102 /*
5103  * copy_dest_receive --- receive one tuple
5104  */
5105 static bool
5107 {
5108  DR_copy *myState = (DR_copy *) self;
5109  CopyState cstate = myState->cstate;
5110 
5111  /* Send the data */
5112  CopyOneRowTo(cstate, slot);
5113  myState->processed++;
5114 
5115  return true;
5116 }
5117 
5118 /*
5119  * copy_dest_shutdown --- executor end
5120  */
5121 static void
5123 {
5124  /* no-op */
5125 }
5126 
5127 /*
5128  * copy_dest_destroy --- release DestReceiver object
5129  */
5130 static void
5132 {
5133  pfree(self);
5134 }
5135 
5136 /*
5137  * CreateCopyDestReceiver -- create a suitable DestReceiver object
5138  */
5139 DestReceiver *
5141 {
5142  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
5143 
5144  self->pub.receiveSlot = copy_dest_receive;
5145  self->pub.rStartup = copy_dest_startup;
5146  self->pub.rShutdown = copy_dest_shutdown;
5147  self->pub.rDestroy = copy_dest_destroy;
5148  self->pub.mydest = DestCopyOut;
5149 
5150  self->cstate = NULL; /* will be set later */
5151  self->processed = 0;
5152 
5153  return (DestReceiver *) self;
5154 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
struct CopyMultiInsertBuffer CopyMultiInsertBuffer
signed short int16
Definition: c.h:354
List * indirection
Definition: parsenodes.h:442
bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
Definition: copy.c:3578
int ri_NumIndices
Definition: execnodes.h:416
Node * whereClause
Definition: copy.c:156
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
#define NIL
Definition: pg_list.h:65
Node * whereClause
Definition: parsenodes.h:2019
Oid tts_tableOid
Definition: tuptable.h:131
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
uint32 CommandId
Definition: c.h:527
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:765
static int GetDecimalFromHex(char hex)
Definition: copy.c:4284
Definition: fmgr.h:56
#define MAX_COPY_DATA_DISPLAY
bool csv_mode
Definition: copy.c:138
static void SendCopyEnd(CopyState cstate)
Definition: copy.c:470
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
Relation ri_RelationDesc
Definition: execnodes.h:413
List * range_table
Definition: copy.c:184
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options)
Definition: copy.c:1456
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:783
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
static bool CopyReadLineText(CopyState cstate)
Definition: copy.c:3920
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:124
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copy.c:2625
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:776
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
Node * val
Definition: parsenodes.h:443
static bool CopyGetInt32(CopyState cstate, int32 *val)
Definition: copy.c:738
int errhint(const char *fmt,...)
Definition: elog.c:1071
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2268
EState * estate
Definition: copy.c:279
int pg_char_to_encoding(const char *name)
Definition: encnames.c:550
char ** raw_fields
Definition: copy.c:202
Definition: copy.c:83
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2784
#define VARDATA(PTR)
Definition: postgres.h:302
static void EndCopy(CopyState cstate)
Definition: copy.c:1798
bool binary
Definition: copy.c:136
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:493
#define pq_flush()
Definition: libpq.h:39
void CopyFromErrorCallback(void *arg)
Definition: copy.c:2233
List * attlist
Definition: parsenodes.h:2013
List * fromClause
Definition: parsenodes.h:1598
#define ISOCTAL(c)
Definition: copy.c:62
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define OCTVALUE(c)
Definition: copy.c:63
#define ResetPerTupleExprContext(estate)
Definition: executor.h:516
#define RelationGetDescr(relation)
Definition: rel.h:482
int LOCKMODE
Definition: lockdefs.h:26
char * name
Definition: parsenodes.h:441
static Datum CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
Definition: copy.c:4709
Oid GetUserId(void)
Definition: miscinit.c:448
#define TABLE_INSERT_FROZEN
Definition: tableam.h:133
bool need_transcoding
Definition: copy.c:126
#define castNode(_type_, nodeptr)
Definition: nodes.h:598
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
FmgrInfo * in_functions
Definition: copy.c:179
AttrNumber num_defaults
Definition: copy.c:178
List * attnumlist
Definition: copy.c:132
#define VARSIZE(PTR)
Definition: postgres.h:303
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * filename
Definition: copy.c:133
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:215
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1174
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
struct CopyMultiInsertInfo CopyMultiInsertInfo
CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copy.c:3329
#define VARHDRSZ
Definition: c.h:561
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:211
struct PartitionRoutingInfo * ri_PartitionInfo
Definition: execnodes.h:490
List * relationOids
Definition: plannodes.h:86
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define pg_hton16(x)
Definition: pg_bswap.h:120
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:234
static void ReceiveCopyBegin(CopyState cstate)
Definition: copy.c:434
#define pg_ntoh16(x)
Definition: pg_bswap.h:124
Definition: copy.c:229
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, CmdType cmdtype)
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copy.c:2356
#define MAX_BUFFERED_TUPLES
Definition: copy.c:246
bool rd_islocaltemp
Definition: rel.h:60
CommandId mycid
Definition: copy.c:280
#define S_IWOTH
Definition: win32_port.h:287
Expr * expression_planner(Expr *expr)
Definition: planner.c:6156
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:143
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copy.c:264
Node * transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind)
Definition: parse_expr.c:145
DestReceiver pub
Definition: copy.c:231
void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options)
Definition: copy.c:1105
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1897
StringInfoData line_buf
Definition: copy.c:211
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen)
Definition: copy.c:313
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:841
#define AccessShareLock
Definition: lockdefs.h:36
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
struct CopyStateData CopyStateData
Definition: nodes.h:529
#define strVal(v)
Definition: value.h:54
struct cursor * cur
Definition: ecpg.c:28
int raw_buf_index
Definition: copy.c:224
static void CopyAttributeOutText(CopyState cstate, char *string)
Definition: copy.c:4767
bool line_buf_valid
Definition: copy.c:213
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1689
CopyState cstate
Definition: copy.c:232
int errcode(int sqlerrcode)
Definition: elog.c:610
#define PG_BINARY_W
Definition: c.h:1236
int namestrcmp(Name name, const char *str)
Definition: name.c:287
#define MemSet(start, val, len)
Definition: c.h:971
Node * eval_const_expressions(PlannerInfo *root, Node *node)
Definition: clauses.c:2255
uint64 CopyFrom(CopyState cstate)
Definition: copy.c:2648
CmdType operation
Definition: execnodes.h:1166
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:103
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1345
Datum * tts_values
Definition: tuptable.h:126
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static void ClosePipeToProgram(CopyState cstate)
Definition: copy.c:1763
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
static int CopyReadAttributesCSV(CopyState cstate)
Definition: copy.c:4540
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
AclMode requiredPerms
Definition: parsenodes.h:1119
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:726
#define SIGPIPE
Definition: win32_port.h:158
EState * state
Definition: execnodes.h:947
bool * force_quote_flags
Definition: copy.c:148
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:903
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
List * pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)
Definition: postgres.c:676
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:370
char * delim
Definition: copy.c:143
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
Node * utilityStmt
Definition: parsenodes.h:120
bool is_program
Definition: parsenodes.h:2016
#define linitial_node(type, l)
Definition: pg_list.h:198
bool volatile_defexprs
Definition: copy.c:183
void(* callback)(void *arg)
Definition: elog.h:229
struct ErrorContextCallback * previous
Definition: elog.h:228
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copy.c:2376
#define PG_BINARY_R
Definition: c.h:1235
ResultRelInfo * resultRelInfo
Definition: copy.c:261
static void copy_dest_destroy(DestReceiver *self)
Definition: copy.c:5131
bool * force_null_flags
Definition: copy.c:152
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
MemoryContext rowcontext
Definition: copy.c:173
bool line_buf_converted
Definition: copy.c:212
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1152
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:692
signed int int32
Definition: c.h:355
int ClosePipeStream(FILE *file)
Definition: fd.c:2731
int errdetail_internal(const char *fmt,...)
Definition: elog.c:984
bool * convert_select_flags
Definition: copy.c:155
static void CopySendChar(CopyState cstate, char c)
Definition: copy.c:511
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1577
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
int location
Definition: parsenodes.h:237
CopyDest copy_dest
Definition: copy.c:117
int location
Definition: parsenodes.h:444
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define REFILL_LINEBUF
Definition: copy.c:330
ErrorContextCallback * error_context_stack
Definition: elog.c:92
uint64 cur_lineno
Definition: copy.c:160
#define list_make1(x1)
Definition: pg_list.h:227
char * null_print
Definition: copy.c:140
const char * cur_attname
Definition: copy.c:161
void assign_expr_collations(ParseState *pstate, Node *expr)
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:462
Definition: copy.c:71
bool trig_insert_instead_row
Definition: reltrigger.h:58
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
Relation rel
Definition: copy.c:130
#define GetPerTupleExprContext(estate)
Definition: executor.h:507
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1778
MemoryContext copycontext
Definition: copy.c:167
#define pq_startcopyout()
Definition: libpq.h:46
copy_data_source_cb data_source_cb
Definition: copy.c:135
bool defGetBoolean(DefElem *def)
Definition: define.c:111
RangeTblEntry * p_rte
Definition: parse_node.h:257
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:128
bool trig_insert_new_table
Definition: reltrigger.h:75
bool has_generated_stored
Definition: tupdesc.h:45
Bitmapset * selectedCols
Definition: parsenodes.h:1121
TupleConversionMap * pi_RootToPartitionMap
Definition: execPartition.h:37
unsigned short uint16
Definition: c.h:366
void pfree(void *pointer)
Definition: mcxt.c:1056
#define pg_ntoh32(x)
Definition: pg_bswap.h:125
#define IS_HIGHBIT_SET(ch)
Definition: c.h:1119
bool wait_result_is_signal(int exit_status, int signum)
Definition: wait_error.c:92
#define linitial(l)
Definition: pg_list.h:195
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options)
Definition: copy.c:1821
static void CopySendInt16(CopyState cstate, int16 val)
Definition: copy.c:755
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1209
#define ERROR
Definition: elog.h:43