PostgreSQL Source Code  git master
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/xact.h"
25 #include "access/xlog.h"
26 #include "catalog/dependency.h"
27 #include "catalog/pg_type.h"
28 #include "commands/copy.h"
29 #include "commands/defrem.h"
30 #include "commands/trigger.h"
31 #include "executor/execPartition.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "parser/parse_relation.h"
41 #include "port/pg_bswap.h"
42 #include "rewrite/rewriteHandler.h"
43 #include "storage/fd.h"
44 #include "tcop/tcopprot.h"
45 #include "utils/builtins.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/portal.h"
49 #include "utils/rel.h"
50 #include "utils/rls.h"
51 #include "utils/snapmgr.h"
52 
53 
54 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
55 #define OCTVALUE(c) ((c) - '0')
56 
57 /*
58  * Represents the different source/dest cases we need to worry about at
59  * the bottom level
60  */
61 typedef enum CopyDest
62 {
63  COPY_FILE, /* to/from file (or a piped program) */
64  COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
65  COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
66  COPY_CALLBACK /* to/from callback function */
67 } CopyDest;
68 
69 /*
70  * Represents the end-of-line terminator type of the input
71  */
72 typedef enum EolType
73 {
78 } EolType;
79 
80 /*
81  * This struct contains all the state variables used throughout a COPY
82  * operation. For simplicity, we use the same struct for all variants of COPY,
83  * even though some fields are used in only some cases.
84  *
85  * Multi-byte encodings: all supported client-side encodings encode multi-byte
86  * characters by having the first byte's high bit set. Subsequent bytes of the
87  * character can have the high bit not set. When scanning data in such an
88  * encoding to look for a match to a single-byte (ie ASCII) character, we must
89  * use the full pg_encoding_mblen() machinery to skip over multibyte
90  * characters, else we might find a false match to a trailing byte. In
91  * supported server encodings, there is no possibility of a false match, and
92  * it's faster to make useless comparisons to trailing bytes than it is to
93  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
94  * when we have to do it the hard way.
95  */
96 typedef struct CopyStateData
97 {
98  /* low-level state data */
99  CopyDest copy_dest; /* type of copy source/destination */
100  FILE *copy_file; /* used if copy_dest == COPY_FILE */
101  StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
102  * dest == COPY_NEW_FE in COPY FROM */
103  bool fe_eof; /* true if detected end of copy data */
104  EolType eol_type; /* EOL type of input */
105  int file_encoding; /* file or remote side's character encoding */
106  bool need_transcoding; /* file encoding diff from server? */
107  bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
108 
109  /* parameters from the COPY command */
110  Relation rel; /* relation to copy to or from */
111  QueryDesc *queryDesc; /* executable query to copy from */
112  List *attnumlist; /* integer list of attnums to copy */
113  char *filename; /* filename, or NULL for STDIN/STDOUT */
114  bool is_program; /* is 'filename' a program to popen? */
115  copy_data_source_cb data_source_cb; /* function for reading data */
116  bool binary; /* binary format? */
117  bool oids; /* include OIDs? */
118  bool freeze; /* freeze rows on loading? */
119  bool csv_mode; /* Comma Separated Value format? */
120  bool header_line; /* CSV header line? */
121  char *null_print; /* NULL marker string (server encoding!) */
122  int null_print_len; /* length of same */
123  char *null_print_client; /* same converted to file encoding */
124  char *delim; /* column delimiter (must be 1 byte) */
125  char *quote; /* CSV quote char (must be 1 byte) */
126  char *escape; /* CSV escape char (must be 1 byte) */
127  List *force_quote; /* list of column names */
128  bool force_quote_all; /* FORCE_QUOTE *? */
129  bool *force_quote_flags; /* per-column CSV FQ flags */
130  List *force_notnull; /* list of column names */
131  bool *force_notnull_flags; /* per-column CSV FNN flags */
132  List *force_null; /* list of column names */
133  bool *force_null_flags; /* per-column CSV FN flags */
134  bool convert_selectively; /* do selective binary conversion? */
135  List *convert_select; /* list of column names (can be NIL) */
136  bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
137 
138  /* these are just for error messages, see CopyFromErrorCallback */
139  const char *cur_relname; /* table name for error messages */
140  int cur_lineno; /* line number for error messages */
141  const char *cur_attname; /* current att for error messages */
142  const char *cur_attval; /* current att value for error messages */
143 
144  /*
145  * Working state for COPY TO/FROM
146  */
147  MemoryContext copycontext; /* per-copy execution context */
148 
149  /*
150  * Working state for COPY TO
151  */
152  FmgrInfo *out_functions; /* lookup info for output functions */
153  MemoryContext rowcontext; /* per-row evaluation context */
154 
155  /*
156  * Working state for COPY FROM
157  */
162  FmgrInfo *in_functions; /* array of input functions for each attrs */
163  Oid *typioparams; /* array of element types for in_functions */
164  int *defmap; /* array of default att numbers */
165  ExprState **defexprs; /* array of default att expressions */
166  bool volatile_defexprs; /* is any of defexprs volatile? */
168 
170  int num_dispatch; /* Number of entries in the above array */
171  int num_partitions; /* Number of members in the following arrays */
172  ResultRelInfo **partitions; /* Per partition result relation pointers */
177 
178  /*
179  * These variables are used to reduce overhead in textual COPY FROM.
180  *
181  * attribute_buf holds the separated, de-escaped text for each field of
182  * the current line. The CopyReadAttributes functions return arrays of
183  * pointers into this buffer. We avoid palloc/pfree overhead by re-using
184  * the buffer on each cycle.
185  */
187 
188  /* field raw data pointers found by COPY FROM */
189 
191  char **raw_fields;
192 
193  /*
194  * Similarly, line_buf holds the whole input line being processed. The
195  * input cycle is first to read the whole line into line_buf, convert it
196  * to server encoding there, and then extract the individual attribute
197  * fields into attribute_buf. line_buf is preserved unmodified so that we
198  * can display it in error messages if appropriate.
199  */
201  bool line_buf_converted; /* converted to server encoding? */
202  bool line_buf_valid; /* contains the row being processed? */
203 
204  /*
205  * Finally, raw_buf holds raw data read from the data source (file or
206  * client connection). CopyReadLine parses this data sufficiently to
207  * locate line boundaries, then transfers the data to line_buf and
208  * converts it. Note: we guarantee that there is a \0 at
209  * raw_buf[raw_buf_len].
210  */
211 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
212  char *raw_buf;
213  int raw_buf_index; /* next byte to process */
214  int raw_buf_len; /* total # of bytes stored */
215 } CopyStateData;
216 
217 /* DestReceiver for COPY (query) TO */
218 typedef struct
219 {
220  DestReceiver pub; /* publicly-known function pointers */
221  CopyState cstate; /* CopyStateData for the command */
222  uint64 processed; /* # of tuples processed */
223 } DR_copy;
224 
225 
226 /*
227  * These macros centralize code used to process line_buf and raw_buf buffers.
228  * They are macros because they often do continue/break control and to avoid
229  * function call overhead in tight COPY loops.
230  *
231  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
232  * prevent the continue/break processing from working. We end the "if (1)"
233  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
234  * any "else" in the calling code, and to avoid any compiler warnings about
235  * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
236  */
237 
238 /*
239  * This keeps the character read at the top of the loop in the buffer
240  * even if there is more than one read-ahead.
241  */
242 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
243 if (1) \
244 { \
245  if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
246  { \
247  raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
248  need_data = true; \
249  continue; \
250  } \
251 } else ((void) 0)
252 
253 /* This consumes the remainder of the buffer and breaks */
254 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
255 if (1) \
256 { \
257  if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
258  { \
259  if (extralen) \
260  raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
261  /* backslash just before EOF, treat as data char */ \
262  result = true; \
263  break; \
264  } \
265 } else ((void) 0)
266 
267 /*
268  * Transfer any approved data to line_buf; must do this to be sure
269  * there is some room in raw_buf.
270  */
271 #define REFILL_LINEBUF \
272 if (1) \
273 { \
274  if (raw_buf_ptr > cstate->raw_buf_index) \
275  { \
276  appendBinaryStringInfo(&cstate->line_buf, \
277  cstate->raw_buf + cstate->raw_buf_index, \
278  raw_buf_ptr - cstate->raw_buf_index); \
279  cstate->raw_buf_index = raw_buf_ptr; \
280  } \
281 } else ((void) 0)
282 
283 /* Undo any read-ahead and jump out of the block. */
284 #define NO_END_OF_COPY_GOTO \
285 if (1) \
286 { \
287  raw_buf_ptr = prev_raw_ptr + 1; \
288  goto not_end_of_copy; \
289 } else ((void) 0)
290 
291 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
292 
293 
294 /* non-export function prototypes */
295 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
296  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
297  List *options);
298 static void EndCopy(CopyState cstate);
299 static void ClosePipeToProgram(CopyState cstate);
300 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
301  Oid queryRelId, const char *filename, bool is_program,
302  List *attnamelist, List *options);
303 static void EndCopyTo(CopyState cstate);
304 static uint64 DoCopyTo(CopyState cstate);
305 static uint64 CopyTo(CopyState cstate);
306 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
307  Datum *values, bool *nulls);
308 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
309  CommandId mycid, int hi_options,
310  ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
311  BulkInsertState bistate,
312  int nBufferedTuples, HeapTuple *bufferedTuples,
313  int firstBufferedLineNo);
314 static bool CopyReadLine(CopyState cstate);
315 static bool CopyReadLineText(CopyState cstate);
316 static int CopyReadAttributesText(CopyState cstate);
317 static int CopyReadAttributesCSV(CopyState cstate);
319  int column_no, FmgrInfo *flinfo,
320  Oid typioparam, int32 typmod,
321  bool *isnull);
322 static void CopyAttributeOutText(CopyState cstate, char *string);
323 static void CopyAttributeOutCSV(CopyState cstate, char *string,
324  bool use_quote, bool single_attr);
325 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
326  List *attnamelist);
327 static char *limit_printout_length(const char *str);
328 
329 /* Low-level communications functions */
330 static void SendCopyBegin(CopyState cstate);
331 static void ReceiveCopyBegin(CopyState cstate);
332 static void SendCopyEnd(CopyState cstate);
333 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
334 static void CopySendString(CopyState cstate, const char *str);
335 static void CopySendChar(CopyState cstate, char c);
336 static void CopySendEndOfRow(CopyState cstate);
337 static int CopyGetData(CopyState cstate, void *databuf,
338  int minread, int maxread);
339 static void CopySendInt32(CopyState cstate, int32 val);
340 static bool CopyGetInt32(CopyState cstate, int32 *val);
341 static void CopySendInt16(CopyState cstate, int16 val);
342 static bool CopyGetInt16(CopyState cstate, int16 *val);
343 
344 
345 /*
346  * Send copy start/stop messages for frontend copies. These have changed
347  * in past protocol redesigns.
348  */
349 static void
351 {
353  {
354  /* new way */
356  int natts = list_length(cstate->attnumlist);
357  int16 format = (cstate->binary ? 1 : 0);
358  int i;
359 
360  pq_beginmessage(&buf, 'H');
361  pq_sendbyte(&buf, format); /* overall format */
362  pq_sendint16(&buf, natts);
363  for (i = 0; i < natts; i++)
364  pq_sendint16(&buf, format); /* per-column formats */
365  pq_endmessage(&buf);
366  cstate->copy_dest = COPY_NEW_FE;
367  }
368  else
369  {
370  /* old way */
371  if (cstate->binary)
372  ereport(ERROR,
373  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374  errmsg("COPY BINARY is not supported to stdout or from stdin")));
375  pq_putemptymessage('H');
376  /* grottiness needed for old COPY OUT protocol */
377  pq_startcopyout();
378  cstate->copy_dest = COPY_OLD_FE;
379  }
380 }
381 
382 static void
384 {
386  {
387  /* new way */
389  int natts = list_length(cstate->attnumlist);
390  int16 format = (cstate->binary ? 1 : 0);
391  int i;
392 
393  pq_beginmessage(&buf, 'G');
394  pq_sendbyte(&buf, format); /* overall format */
395  pq_sendint16(&buf, natts);
396  for (i = 0; i < natts; i++)
397  pq_sendint16(&buf, format); /* per-column formats */
398  pq_endmessage(&buf);
399  cstate->copy_dest = COPY_NEW_FE;
400  cstate->fe_msgbuf = makeStringInfo();
401  }
402  else
403  {
404  /* old way */
405  if (cstate->binary)
406  ereport(ERROR,
407  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408  errmsg("COPY BINARY is not supported to stdout or from stdin")));
409  pq_putemptymessage('G');
410  /* any error in old protocol will make us lose sync */
411  pq_startmsgread();
412  cstate->copy_dest = COPY_OLD_FE;
413  }
414  /* We *must* flush here to ensure FE knows it can send. */
415  pq_flush();
416 }
417 
418 static void
420 {
421  if (cstate->copy_dest == COPY_NEW_FE)
422  {
423  /* Shouldn't have any unsent data */
424  Assert(cstate->fe_msgbuf->len == 0);
425  /* Send Copy Done message */
426  pq_putemptymessage('c');
427  }
428  else
429  {
430  CopySendData(cstate, "\\.", 2);
431  /* Need to flush out the trailer (this also appends a newline) */
432  CopySendEndOfRow(cstate);
433  pq_endcopyout(false);
434  }
435 }
436 
437 /*----------
438  * CopySendData sends output data to the destination (file or frontend)
439  * CopySendString does the same for null-terminated strings
440  * CopySendChar does the same for single characters
441  * CopySendEndOfRow does the appropriate thing at end of each data row
442  * (data is not actually flushed except by CopySendEndOfRow)
443  *
444  * NB: no data conversion is applied by these functions
445  *----------
446  */
447 static void
448 CopySendData(CopyState cstate, const void *databuf, int datasize)
449 {
450  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
451 }
452 
453 static void
454 CopySendString(CopyState cstate, const char *str)
455 {
456  appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
457 }
458 
459 static void
460 CopySendChar(CopyState cstate, char c)
461 {
463 }
464 
465 static void
467 {
468  StringInfo fe_msgbuf = cstate->fe_msgbuf;
469 
470  switch (cstate->copy_dest)
471  {
472  case COPY_FILE:
473  if (!cstate->binary)
474  {
475  /* Default line termination depends on platform */
476 #ifndef WIN32
477  CopySendChar(cstate, '\n');
478 #else
479  CopySendString(cstate, "\r\n");
480 #endif
481  }
482 
483  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
484  cstate->copy_file) != 1 ||
485  ferror(cstate->copy_file))
486  {
487  if (cstate->is_program)
488  {
489  if (errno == EPIPE)
490  {
491  /*
492  * The pipe will be closed automatically on error at
493  * the end of transaction, but we might get a better
494  * error message from the subprocess' exit code than
495  * just "Broken Pipe"
496  */
497  ClosePipeToProgram(cstate);
498 
499  /*
500  * If ClosePipeToProgram() didn't throw an error, the
501  * program terminated normally, but closed the pipe
502  * first. Restore errno, and throw an error.
503  */
504  errno = EPIPE;
505  }
506  ereport(ERROR,
508  errmsg("could not write to COPY program: %m")));
509  }
510  else
511  ereport(ERROR,
513  errmsg("could not write to COPY file: %m")));
514  }
515  break;
516  case COPY_OLD_FE:
517  /* The FE/BE protocol uses \n as newline for all platforms */
518  if (!cstate->binary)
519  CopySendChar(cstate, '\n');
520 
521  if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
522  {
523  /* no hope of recovering connection sync, so FATAL */
524  ereport(FATAL,
525  (errcode(ERRCODE_CONNECTION_FAILURE),
526  errmsg("connection lost during COPY to stdout")));
527  }
528  break;
529  case COPY_NEW_FE:
530  /* The FE/BE protocol uses \n as newline for all platforms */
531  if (!cstate->binary)
532  CopySendChar(cstate, '\n');
533 
534  /* Dump the accumulated row as one CopyData message */
535  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
536  break;
537  case COPY_CALLBACK:
538  Assert(false); /* Not yet supported. */
539  break;
540  }
541 
542  resetStringInfo(fe_msgbuf);
543 }
544 
545 /*
546  * CopyGetData reads data from the source (file or frontend)
547  *
548  * We attempt to read at least minread, and at most maxread, bytes from
549  * the source. The actual number of bytes read is returned; if this is
550  * less than minread, EOF was detected.
551  *
552  * Note: when copying from the frontend, we expect a proper EOF mark per
553  * protocol; if the frontend simply drops the connection, we raise error.
554  * It seems unwise to allow the COPY IN to complete normally in that case.
555  *
556  * NB: no data conversion is applied here.
557  */
558 static int
559 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
560 {
561  int bytesread = 0;
562 
563  switch (cstate->copy_dest)
564  {
565  case COPY_FILE:
566  bytesread = fread(databuf, 1, maxread, cstate->copy_file);
567  if (ferror(cstate->copy_file))
568  ereport(ERROR,
570  errmsg("could not read from COPY file: %m")));
571  break;
572  case COPY_OLD_FE:
573 
574  /*
575  * We cannot read more than minread bytes (which in practice is 1)
576  * because old protocol doesn't have any clear way of separating
577  * the COPY stream from following data. This is slow, but not any
578  * slower than the code path was originally, and we don't care
579  * much anymore about the performance of old protocol.
580  */
581  if (pq_getbytes((char *) databuf, minread))
582  {
583  /* Only a \. terminator is legal EOF in old protocol */
584  ereport(ERROR,
585  (errcode(ERRCODE_CONNECTION_FAILURE),
586  errmsg("unexpected EOF on client connection with an open transaction")));
587  }
588  bytesread = minread;
589  break;
590  case COPY_NEW_FE:
591  while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
592  {
593  int avail;
594 
595  while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
596  {
597  /* Try to receive another message */
598  int mtype;
599 
600  readmessage:
602  pq_startmsgread();
603  mtype = pq_getbyte();
604  if (mtype == EOF)
605  ereport(ERROR,
606  (errcode(ERRCODE_CONNECTION_FAILURE),
607  errmsg("unexpected EOF on client connection with an open transaction")));
608  if (pq_getmessage(cstate->fe_msgbuf, 0))
609  ereport(ERROR,
610  (errcode(ERRCODE_CONNECTION_FAILURE),
611  errmsg("unexpected EOF on client connection with an open transaction")));
613  switch (mtype)
614  {
615  case 'd': /* CopyData */
616  break;
617  case 'c': /* CopyDone */
618  /* COPY IN correctly terminated by frontend */
619  cstate->fe_eof = true;
620  return bytesread;
621  case 'f': /* CopyFail */
622  ereport(ERROR,
623  (errcode(ERRCODE_QUERY_CANCELED),
624  errmsg("COPY from stdin failed: %s",
625  pq_getmsgstring(cstate->fe_msgbuf))));
626  break;
627  case 'H': /* Flush */
628  case 'S': /* Sync */
629 
630  /*
631  * Ignore Flush/Sync for the convenience of client
632  * libraries (such as libpq) that may send those
633  * without noticing that the command they just
634  * sent was COPY.
635  */
636  goto readmessage;
637  default:
638  ereport(ERROR,
639  (errcode(ERRCODE_PROTOCOL_VIOLATION),
640  errmsg("unexpected message type 0x%02X during COPY from stdin",
641  mtype)));
642  break;
643  }
644  }
645  avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
646  if (avail > maxread)
647  avail = maxread;
648  pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
649  databuf = (void *) ((char *) databuf + avail);
650  maxread -= avail;
651  bytesread += avail;
652  }
653  break;
654  case COPY_CALLBACK:
655  bytesread = cstate->data_source_cb(databuf, minread, maxread);
656  break;
657  }
658 
659  return bytesread;
660 }
661 
662 
663 /*
664  * These functions do apply some data conversion
665  */
666 
667 /*
668  * CopySendInt32 sends an int32 in network byte order
669  */
670 static void
672 {
673  uint32 buf;
674 
675  buf = pg_hton32((uint32) val);
676  CopySendData(cstate, &buf, sizeof(buf));
677 }
678 
679 /*
680  * CopyGetInt32 reads an int32 that appears in network byte order
681  *
682  * Returns true if OK, false if EOF
683  */
684 static bool
686 {
687  uint32 buf;
688 
689  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
690  {
691  *val = 0; /* suppress compiler warning */
692  return false;
693  }
694  *val = (int32) pg_ntoh32(buf);
695  return true;
696 }
697 
698 /*
699  * CopySendInt16 sends an int16 in network byte order
700  */
701 static void
703 {
704  uint16 buf;
705 
706  buf = pg_hton16((uint16) val);
707  CopySendData(cstate, &buf, sizeof(buf));
708 }
709 
710 /*
711  * CopyGetInt16 reads an int16 that appears in network byte order
712  */
713 static bool
715 {
716  uint16 buf;
717 
718  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
719  {
720  *val = 0; /* suppress compiler warning */
721  return false;
722  }
723  *val = (int16) pg_ntoh16(buf);
724  return true;
725 }
726 
727 
728 /*
729  * CopyLoadRawBuf loads some more data into raw_buf
730  *
731  * Returns true if able to obtain at least one more byte, else false.
732  *
733  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
734  * down to the start of the buffer and then we load more data after that.
735  * This case is used only when a frontend multibyte character crosses a
736  * bufferload boundary.
737  */
738 static bool
740 {
741  int nbytes;
742  int inbytes;
743 
744  if (cstate->raw_buf_index < cstate->raw_buf_len)
745  {
746  /* Copy down the unprocessed data */
747  nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
748  memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
749  nbytes);
750  }
751  else
752  nbytes = 0; /* no data need be saved */
753 
754  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
755  1, RAW_BUF_SIZE - nbytes);
756  nbytes += inbytes;
757  cstate->raw_buf[nbytes] = '\0';
758  cstate->raw_buf_index = 0;
759  cstate->raw_buf_len = nbytes;
760  return (inbytes > 0);
761 }
762 
763 
764 /*
765  * DoCopy executes the SQL COPY statement
766  *
767  * Either unload or reload contents of table <relation>, depending on <from>.
768  * (<from> = true means we are inserting into the table.) In the "TO" case
769  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
770  * or DELETE query.
771  *
772  * If <pipe> is false, transfer is between the table and the file named
773  * <filename>. Otherwise, transfer is between the table and our regular
774  * input/output stream. The latter could be either stdin/stdout or a
775  * socket, depending on whether we're running under Postmaster control.
776  *
777  * Do not allow a Postgres user without superuser privilege to read from
778  * or write to a file.
779  *
780  * Do not allow the copy if user doesn't have proper permission to access
781  * the table or the specifically requested columns.
782  */
783 void
784 DoCopy(ParseState *pstate, const CopyStmt *stmt,
785  int stmt_location, int stmt_len,
786  uint64 *processed)
787 {
788  CopyState cstate;
789  bool is_from = stmt->is_from;
790  bool pipe = (stmt->filename == NULL);
791  Relation rel;
792  Oid relid;
793  RawStmt *query = NULL;
794 
795  /* Disallow COPY to/from file or program except to superusers. */
796  if (!pipe && !superuser())
797  {
798  if (stmt->is_program)
799  ereport(ERROR,
800  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
801  errmsg("must be superuser to COPY to or from an external program"),
802  errhint("Anyone can COPY to stdout or from stdin. "
803  "psql's \\copy command also works for anyone.")));
804  else
805  ereport(ERROR,
806  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807  errmsg("must be superuser to COPY to or from a file"),
808  errhint("Anyone can COPY to stdout or from stdin. "
809  "psql's \\copy command also works for anyone.")));
810  }
811 
812  if (stmt->relation)
813  {
814  TupleDesc tupDesc;
815  List *attnums;
816  ListCell *cur;
817  RangeTblEntry *rte;
818 
819  Assert(!stmt->query);
820 
821  /* Open and lock the relation, using the appropriate lock type. */
822  rel = heap_openrv(stmt->relation,
823  (is_from ? RowExclusiveLock : AccessShareLock));
824 
825  relid = RelationGetRelid(rel);
826 
827  rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
828  rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
829 
830  tupDesc = RelationGetDescr(rel);
831  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
832  foreach(cur, attnums)
833  {
834  int attno = lfirst_int(cur) -
836 
837  if (is_from)
838  rte->insertedCols = bms_add_member(rte->insertedCols, attno);
839  else
840  rte->selectedCols = bms_add_member(rte->selectedCols, attno);
841  }
842  ExecCheckRTPerms(pstate->p_rtable, true);
843 
844  /*
845  * Permission check for row security policies.
846  *
847  * check_enable_rls will ereport(ERROR) if the user has requested
848  * something invalid and will otherwise indicate if we should enable
849  * RLS (returns RLS_ENABLED) or not for this COPY statement.
850  *
851  * If the relation has a row security policy and we are to apply it
852  * then perform a "query" copy and allow the normal query processing
853  * to handle the policies.
854  *
855  * If RLS is not enabled for this, then just fall through to the
856  * normal non-filtering relation handling.
857  */
858  if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
859  {
861  ColumnRef *cr;
862  ResTarget *target;
863  RangeVar *from;
864  List *targetList = NIL;
865 
866  if (is_from)
867  ereport(ERROR,
868  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
869  errmsg("COPY FROM not supported with row-level security"),
870  errhint("Use INSERT statements instead.")));
871 
872  /*
873  * Build target list
874  *
875  * If no columns are specified in the attribute list of the COPY
876  * command, then the target list is 'all' columns. Therefore, '*'
877  * should be used as the target list for the resulting SELECT
878  * statement.
879  *
880  * In the case that columns are specified in the attribute list,
881  * create a ColumnRef and ResTarget for each column and add them
882  * to the target list for the resulting SELECT statement.
883  */
884  if (!stmt->attlist)
885  {
886  cr = makeNode(ColumnRef);
888  cr->location = -1;
889 
890  target = makeNode(ResTarget);
891  target->name = NULL;
892  target->indirection = NIL;
893  target->val = (Node *) cr;
894  target->location = -1;
895 
896  targetList = list_make1(target);
897  }
898  else
899  {
900  ListCell *lc;
901 
902  foreach(lc, stmt->attlist)
903  {
904  /*
905  * Build the ColumnRef for each column. The ColumnRef
906  * 'fields' property is a String 'Value' node (see
907  * nodes/value.h) that corresponds to the column name
908  * respectively.
909  */
910  cr = makeNode(ColumnRef);
911  cr->fields = list_make1(lfirst(lc));
912  cr->location = -1;
913 
914  /* Build the ResTarget and add the ColumnRef to it. */
915  target = makeNode(ResTarget);
916  target->name = NULL;
917  target->indirection = NIL;
918  target->val = (Node *) cr;
919  target->location = -1;
920 
921  /* Add each column to the SELECT statement's target list */
922  targetList = lappend(targetList, target);
923  }
924  }
925 
926  /*
927  * Build RangeVar for from clause, fully qualified based on the
928  * relation which we have opened and locked.
929  */
932  -1);
933 
934  /* Build query */
935  select = makeNode(SelectStmt);
936  select->targetList = targetList;
937  select->fromClause = list_make1(from);
938 
939  query = makeNode(RawStmt);
940  query->stmt = (Node *) select;
941  query->stmt_location = stmt_location;
942  query->stmt_len = stmt_len;
943 
944  /*
945  * Close the relation for now, but keep the lock on it to prevent
946  * changes between now and when we start the query-based COPY.
947  *
948  * We'll reopen it later as part of the query-based COPY.
949  */
950  heap_close(rel, NoLock);
951  rel = NULL;
952  }
953  }
954  else
955  {
956  Assert(stmt->query);
957 
958  query = makeNode(RawStmt);
959  query->stmt = stmt->query;
960  query->stmt_location = stmt_location;
961  query->stmt_len = stmt_len;
962 
963  relid = InvalidOid;
964  rel = NULL;
965  }
966 
967  if (is_from)
968  {
969  Assert(rel);
970 
971  /* check read-only transaction and parallel mode */
972  if (XactReadOnly && !rel->rd_islocaltemp)
973  PreventCommandIfReadOnly("COPY FROM");
974  PreventCommandIfParallelMode("COPY FROM");
975 
976  cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
977  NULL, stmt->attlist, stmt->options);
978  *processed = CopyFrom(cstate); /* copy from file to database */
979  EndCopyFrom(cstate);
980  }
981  else
982  {
983  cstate = BeginCopyTo(pstate, rel, query, relid,
984  stmt->filename, stmt->is_program,
985  stmt->attlist, stmt->options);
986  *processed = DoCopyTo(cstate); /* copy from database to file */
987  EndCopyTo(cstate);
988  }
989 
990  /*
991  * Close the relation. If reading, we can release the AccessShareLock we
992  * got; if writing, we should hold the lock until end of transaction to
993  * ensure that updates will be committed before lock is released.
994  */
995  if (rel != NULL)
996  heap_close(rel, (is_from ? NoLock : AccessShareLock));
997 }
998 
999 /*
1000  * Process the statement option list for COPY.
1001  *
1002  * Scan the options list (a list of DefElem) and transpose the information
1003  * into cstate, applying appropriate error checking.
1004  *
1005  * cstate is assumed to be filled with zeroes initially.
1006  *
1007  * This is exported so that external users of the COPY API can sanity-check
1008  * a list of options. In that usage, cstate should be passed as NULL
1009  * (since external users don't know sizeof(CopyStateData)) and the collected
1010  * data is just leaked until CurrentMemoryContext is reset.
1011  *
1012  * Note that additional checking, such as whether column names listed in FORCE
1013  * QUOTE actually exist, has to be applied later. This just checks for
1014  * self-consistency of the options list.
1015  */
1016 void
1018  CopyState cstate,
1019  bool is_from,
1020  List *options)
1021 {
1022  bool format_specified = false;
1023  ListCell *option;
1024 
1025  /* Support external use for option sanity checking */
1026  if (cstate == NULL)
1027  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1028 
1029  cstate->file_encoding = -1;
1030 
1031  /* Extract options from the statement node tree */
1032  foreach(option, options)
1033  {
1034  DefElem *defel = lfirst_node(DefElem, option);
1035 
1036  if (strcmp(defel->defname, "format") == 0)
1037  {
1038  char *fmt = defGetString(defel);
1039 
1040  if (format_specified)
1041  ereport(ERROR,
1042  (errcode(ERRCODE_SYNTAX_ERROR),
1043  errmsg("conflicting or redundant options"),
1044  parser_errposition(pstate, defel->location)));
1045  format_specified = true;
1046  if (strcmp(fmt, "text") == 0)
1047  /* default format */ ;
1048  else if (strcmp(fmt, "csv") == 0)
1049  cstate->csv_mode = true;
1050  else if (strcmp(fmt, "binary") == 0)
1051  cstate->binary = true;
1052  else
1053  ereport(ERROR,
1054  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1055  errmsg("COPY format \"%s\" not recognized", fmt),
1056  parser_errposition(pstate, defel->location)));
1057  }
1058  else if (strcmp(defel->defname, "oids") == 0)
1059  {
1060  if (cstate->oids)
1061  ereport(ERROR,
1062  (errcode(ERRCODE_SYNTAX_ERROR),
1063  errmsg("conflicting or redundant options"),
1064  parser_errposition(pstate, defel->location)));
1065  cstate->oids = defGetBoolean(defel);
1066  }
1067  else if (strcmp(defel->defname, "freeze") == 0)
1068  {
1069  if (cstate->freeze)
1070  ereport(ERROR,
1071  (errcode(ERRCODE_SYNTAX_ERROR),
1072  errmsg("conflicting or redundant options"),
1073  parser_errposition(pstate, defel->location)));
1074  cstate->freeze = defGetBoolean(defel);
1075  }
1076  else if (strcmp(defel->defname, "delimiter") == 0)
1077  {
1078  if (cstate->delim)
1079  ereport(ERROR,
1080  (errcode(ERRCODE_SYNTAX_ERROR),
1081  errmsg("conflicting or redundant options"),
1082  parser_errposition(pstate, defel->location)));
1083  cstate->delim = defGetString(defel);
1084  }
1085  else if (strcmp(defel->defname, "null") == 0)
1086  {
1087  if (cstate->null_print)
1088  ereport(ERROR,
1089  (errcode(ERRCODE_SYNTAX_ERROR),
1090  errmsg("conflicting or redundant options"),
1091  parser_errposition(pstate, defel->location)));
1092  cstate->null_print = defGetString(defel);
1093  }
1094  else if (strcmp(defel->defname, "header") == 0)
1095  {
1096  if (cstate->header_line)
1097  ereport(ERROR,
1098  (errcode(ERRCODE_SYNTAX_ERROR),
1099  errmsg("conflicting or redundant options"),
1100  parser_errposition(pstate, defel->location)));
1101  cstate->header_line = defGetBoolean(defel);
1102  }
1103  else if (strcmp(defel->defname, "quote") == 0)
1104  {
1105  if (cstate->quote)
1106  ereport(ERROR,
1107  (errcode(ERRCODE_SYNTAX_ERROR),
1108  errmsg("conflicting or redundant options"),
1109  parser_errposition(pstate, defel->location)));
1110  cstate->quote = defGetString(defel);
1111  }
1112  else if (strcmp(defel->defname, "escape") == 0)
1113  {
1114  if (cstate->escape)
1115  ereport(ERROR,
1116  (errcode(ERRCODE_SYNTAX_ERROR),
1117  errmsg("conflicting or redundant options"),
1118  parser_errposition(pstate, defel->location)));
1119  cstate->escape = defGetString(defel);
1120  }
1121  else if (strcmp(defel->defname, "force_quote") == 0)
1122  {
1123  if (cstate->force_quote || cstate->force_quote_all)
1124  ereport(ERROR,
1125  (errcode(ERRCODE_SYNTAX_ERROR),
1126  errmsg("conflicting or redundant options"),
1127  parser_errposition(pstate, defel->location)));
1128  if (defel->arg && IsA(defel->arg, A_Star))
1129  cstate->force_quote_all = true;
1130  else if (defel->arg && IsA(defel->arg, List))
1131  cstate->force_quote = castNode(List, defel->arg);
1132  else
1133  ereport(ERROR,
1134  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1135  errmsg("argument to option \"%s\" must be a list of column names",
1136  defel->defname),
1137  parser_errposition(pstate, defel->location)));
1138  }
1139  else if (strcmp(defel->defname, "force_not_null") == 0)
1140  {
1141  if (cstate->force_notnull)
1142  ereport(ERROR,
1143  (errcode(ERRCODE_SYNTAX_ERROR),
1144  errmsg("conflicting or redundant options"),
1145  parser_errposition(pstate, defel->location)));
1146  if (defel->arg && IsA(defel->arg, List))
1147  cstate->force_notnull = castNode(List, defel->arg);
1148  else
1149  ereport(ERROR,
1150  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1151  errmsg("argument to option \"%s\" must be a list of column names",
1152  defel->defname),
1153  parser_errposition(pstate, defel->location)));
1154  }
1155  else if (strcmp(defel->defname, "force_null") == 0)
1156  {
1157  if (cstate->force_null)
1158  ereport(ERROR,
1159  (errcode(ERRCODE_SYNTAX_ERROR),
1160  errmsg("conflicting or redundant options")));
1161  if (defel->arg && IsA(defel->arg, List))
1162  cstate->force_null = castNode(List, defel->arg);
1163  else
1164  ereport(ERROR,
1165  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1166  errmsg("argument to option \"%s\" must be a list of column names",
1167  defel->defname),
1168  parser_errposition(pstate, defel->location)));
1169  }
1170  else if (strcmp(defel->defname, "convert_selectively") == 0)
1171  {
1172  /*
1173  * Undocumented, not-accessible-from-SQL option: convert only the
1174  * named columns to binary form, storing the rest as NULLs. It's
1175  * allowed for the column list to be NIL.
1176  */
1177  if (cstate->convert_selectively)
1178  ereport(ERROR,
1179  (errcode(ERRCODE_SYNTAX_ERROR),
1180  errmsg("conflicting or redundant options"),
1181  parser_errposition(pstate, defel->location)));
1182  cstate->convert_selectively = true;
1183  if (defel->arg == NULL || IsA(defel->arg, List))
1184  cstate->convert_select = castNode(List, defel->arg);
1185  else
1186  ereport(ERROR,
1187  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1188  errmsg("argument to option \"%s\" must be a list of column names",
1189  defel->defname),
1190  parser_errposition(pstate, defel->location)));
1191  }
1192  else if (strcmp(defel->defname, "encoding") == 0)
1193  {
1194  if (cstate->file_encoding >= 0)
1195  ereport(ERROR,
1196  (errcode(ERRCODE_SYNTAX_ERROR),
1197  errmsg("conflicting or redundant options"),
1198  parser_errposition(pstate, defel->location)));
1200  if (cstate->file_encoding < 0)
1201  ereport(ERROR,
1202  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1203  errmsg("argument to option \"%s\" must be a valid encoding name",
1204  defel->defname),
1205  parser_errposition(pstate, defel->location)));
1206  }
1207  else
1208  ereport(ERROR,
1209  (errcode(ERRCODE_SYNTAX_ERROR),
1210  errmsg("option \"%s\" not recognized",
1211  defel->defname),
1212  parser_errposition(pstate, defel->location)));
1213  }
1214 
1215  /*
1216  * Check for incompatible options (must do these two before inserting
1217  * defaults)
1218  */
1219  if (cstate->binary && cstate->delim)
1220  ereport(ERROR,
1221  (errcode(ERRCODE_SYNTAX_ERROR),
1222  errmsg("cannot specify DELIMITER in BINARY mode")));
1223 
1224  if (cstate->binary && cstate->null_print)
1225  ereport(ERROR,
1226  (errcode(ERRCODE_SYNTAX_ERROR),
1227  errmsg("cannot specify NULL in BINARY mode")));
1228 
1229  /* Set defaults for omitted options */
1230  if (!cstate->delim)
1231  cstate->delim = cstate->csv_mode ? "," : "\t";
1232 
1233  if (!cstate->null_print)
1234  cstate->null_print = cstate->csv_mode ? "" : "\\N";
1235  cstate->null_print_len = strlen(cstate->null_print);
1236 
1237  if (cstate->csv_mode)
1238  {
1239  if (!cstate->quote)
1240  cstate->quote = "\"";
1241  if (!cstate->escape)
1242  cstate->escape = cstate->quote;
1243  }
1244 
1245  /* Only single-byte delimiter strings are supported. */
1246  if (strlen(cstate->delim) != 1)
1247  ereport(ERROR,
1248  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1249  errmsg("COPY delimiter must be a single one-byte character")));
1250 
1251  /* Disallow end-of-line characters */
1252  if (strchr(cstate->delim, '\r') != NULL ||
1253  strchr(cstate->delim, '\n') != NULL)
1254  ereport(ERROR,
1255  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1256  errmsg("COPY delimiter cannot be newline or carriage return")));
1257 
1258  if (strchr(cstate->null_print, '\r') != NULL ||
1259  strchr(cstate->null_print, '\n') != NULL)
1260  ereport(ERROR,
1261  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1262  errmsg("COPY null representation cannot use newline or carriage return")));
1263 
1264  /*
1265  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1266  * backslash because it would be ambiguous. We can't allow the other
1267  * cases because data characters matching the delimiter must be
1268  * backslashed, and certain backslash combinations are interpreted
1269  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1270  * more than strictly necessary, but seems best for consistency and
1271  * future-proofing. Likewise we disallow all digits though only octal
1272  * digits are actually dangerous.
1273  */
1274  if (!cstate->csv_mode &&
1275  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1276  cstate->delim[0]) != NULL)
1277  ereport(ERROR,
1278  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1279  errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1280 
1281  /* Check header */
1282  if (!cstate->csv_mode && cstate->header_line)
1283  ereport(ERROR,
1284  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1285  errmsg("COPY HEADER available only in CSV mode")));
1286 
1287  /* Check quote */
1288  if (!cstate->csv_mode && cstate->quote != NULL)
1289  ereport(ERROR,
1290  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1291  errmsg("COPY quote available only in CSV mode")));
1292 
1293  if (cstate->csv_mode && strlen(cstate->quote) != 1)
1294  ereport(ERROR,
1295  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1296  errmsg("COPY quote must be a single one-byte character")));
1297 
1298  if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1299  ereport(ERROR,
1300  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1301  errmsg("COPY delimiter and quote must be different")));
1302 
1303  /* Check escape */
1304  if (!cstate->csv_mode && cstate->escape != NULL)
1305  ereport(ERROR,
1306  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1307  errmsg("COPY escape available only in CSV mode")));
1308 
1309  if (cstate->csv_mode && strlen(cstate->escape) != 1)
1310  ereport(ERROR,
1311  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1312  errmsg("COPY escape must be a single one-byte character")));
1313 
1314  /* Check force_quote */
1315  if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1316  ereport(ERROR,
1317  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1318  errmsg("COPY force quote available only in CSV mode")));
1319  if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1320  ereport(ERROR,
1321  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1322  errmsg("COPY force quote only available using COPY TO")));
1323 
1324  /* Check force_notnull */
1325  if (!cstate->csv_mode && cstate->force_notnull != NIL)
1326  ereport(ERROR,
1327  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1328  errmsg("COPY force not null available only in CSV mode")));
1329  if (cstate->force_notnull != NIL && !is_from)
1330  ereport(ERROR,
1331  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1332  errmsg("COPY force not null only available using COPY FROM")));
1333 
1334  /* Check force_null */
1335  if (!cstate->csv_mode && cstate->force_null != NIL)
1336  ereport(ERROR,
1337  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338  errmsg("COPY force null available only in CSV mode")));
1339 
1340  if (cstate->force_null != NIL && !is_from)
1341  ereport(ERROR,
1342  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1343  errmsg("COPY force null only available using COPY FROM")));
1344 
1345  /* Don't allow the delimiter to appear in the null string. */
1346  if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1347  ereport(ERROR,
1348  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1349  errmsg("COPY delimiter must not appear in the NULL specification")));
1350 
1351  /* Don't allow the CSV quote char to appear in the null string. */
1352  if (cstate->csv_mode &&
1353  strchr(cstate->null_print, cstate->quote[0]) != NULL)
1354  ereport(ERROR,
1355  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1356  errmsg("CSV quote character must not appear in the NULL specification")));
1357 }
1358 
1359 /*
1360  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1361  *
1362  * Iff <binary>, unload or reload in the binary format, as opposed to the
1363  * more wasteful but more robust and portable text format.
1364  *
1365  * Iff <oids>, unload or reload the format that includes OID information.
1366  * On input, we accept OIDs whether or not the table has an OID column,
1367  * but silently drop them if it does not. On output, we report an error
1368  * if the user asks for OIDs in a table that has none (not providing an
1369  * OID column might seem friendlier, but could seriously confuse programs).
1370  *
1371  * If in the text format, delimit columns with delimiter <delim> and print
1372  * NULL values as <null_print>.
1373  */
1374 static CopyState
1376  bool is_from,
1377  Relation rel,
1378  RawStmt *raw_query,
1379  Oid queryRelId,
1380  List *attnamelist,
1381  List *options)
1382 {
1383  CopyState cstate;
1384  TupleDesc tupDesc;
1385  int num_phys_attrs;
1386  MemoryContext oldcontext;
1387 
1388  /* Allocate workspace and zero all fields */
1389  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1390 
1391  /*
1392  * We allocate everything used by a cstate in a new memory context. This
1393  * avoids memory leaks during repeated use of COPY in a query.
1394  */
1396  "COPY",
1398 
1399  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1400 
1401  /* Extract options from the statement node tree */
1402  ProcessCopyOptions(pstate, cstate, is_from, options);
1403 
1404  /* Process the source/target relation or query */
1405  if (rel)
1406  {
1407  Assert(!raw_query);
1408 
1409  cstate->rel = rel;
1410 
1411  tupDesc = RelationGetDescr(cstate->rel);
1412 
1413  /* Don't allow COPY w/ OIDs to or from a table without them */
1414  if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1415  ereport(ERROR,
1416  (errcode(ERRCODE_UNDEFINED_COLUMN),
1417  errmsg("table \"%s\" does not have OIDs",
1418  RelationGetRelationName(cstate->rel))));
1419  }
1420  else
1421  {
1422  List *rewritten;
1423  Query *query;
1424  PlannedStmt *plan;
1425  DestReceiver *dest;
1426 
1427  Assert(!is_from);
1428  cstate->rel = NULL;
1429 
1430  /* Don't allow COPY w/ OIDs from a query */
1431  if (cstate->oids)
1432  ereport(ERROR,
1433  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1434  errmsg("COPY (query) WITH OIDS is not supported")));
1435 
1436  /*
1437  * Run parse analysis and rewrite. Note this also acquires sufficient
1438  * locks on the source table(s).
1439  *
1440  * Because the parser and planner tend to scribble on their input, we
1441  * make a preliminary copy of the source querytree. This prevents
1442  * problems in the case that the COPY is in a portal or plpgsql
1443  * function and is executed repeatedly. (See also the same hack in
1444  * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1445  */
1446  rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1447  pstate->p_sourcetext, NULL, 0,
1448  NULL);
1449 
1450  /* check that we got back something we can work with */
1451  if (rewritten == NIL)
1452  {
1453  ereport(ERROR,
1454  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1455  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1456  }
1457  else if (list_length(rewritten) > 1)
1458  {
1459  ListCell *lc;
1460 
1461  /* examine queries to determine which error message to issue */
1462  foreach(lc, rewritten)
1463  {
1464  Query *q = lfirst_node(Query, lc);
1465 
1467  ereport(ERROR,
1468  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1469  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1471  ereport(ERROR,
1472  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473  errmsg("DO ALSO rules are not supported for the COPY")));
1474  }
1475 
1476  ereport(ERROR,
1477  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1478  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1479  }
1480 
1481  query = linitial_node(Query, rewritten);
1482 
1483  /* The grammar allows SELECT INTO, but we don't support that */
1484  if (query->utilityStmt != NULL &&
1486  ereport(ERROR,
1487  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1488  errmsg("COPY (SELECT INTO) is not supported")));
1489 
1490  Assert(query->utilityStmt == NULL);
1491 
1492  /*
1493  * Similarly the grammar doesn't enforce the presence of a RETURNING
1494  * clause, but this is required here.
1495  */
1496  if (query->commandType != CMD_SELECT &&
1497  query->returningList == NIL)
1498  {
1499  Assert(query->commandType == CMD_INSERT ||
1500  query->commandType == CMD_UPDATE ||
1501  query->commandType == CMD_DELETE);
1502 
1503  ereport(ERROR,
1504  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1505  errmsg("COPY query must have a RETURNING clause")));
1506  }
1507 
1508  /* plan the query */
1509  plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1510 
1511  /*
1512  * With row level security and a user using "COPY relation TO", we
1513  * have to convert the "COPY relation TO" to a query-based COPY (eg:
1514  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1515  * in any RLS clauses.
1516  *
1517  * When this happens, we are passed in the relid of the originally
1518  * found relation (which we have locked). As the planner will look up
1519  * the relation again, we double-check here to make sure it found the
1520  * same one that we have locked.
1521  */
1522  if (queryRelId != InvalidOid)
1523  {
1524  /*
1525  * Note that with RLS involved there may be multiple relations,
1526  * and while the one we need is almost certainly first, we don't
1527  * make any guarantees of that in the planner, so check the whole
1528  * list and make sure we find the original relation.
1529  */
1530  if (!list_member_oid(plan->relationOids, queryRelId))
1531  ereport(ERROR,
1532  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1533  errmsg("relation referenced by COPY statement has changed")));
1534  }
1535 
1536  /*
1537  * Use a snapshot with an updated command ID to ensure this query sees
1538  * results of any previously executed queries.
1539  */
1542 
1543  /* Create dest receiver for COPY OUT */
1545  ((DR_copy *) dest)->cstate = cstate;
1546 
1547  /* Create a QueryDesc requesting no output */
1548  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1551  dest, NULL, NULL, 0);
1552 
1553  /*
1554  * Call ExecutorStart to prepare the plan for execution.
1555  *
1556  * ExecutorStart computes a result tupdesc for us
1557  */
1558  ExecutorStart(cstate->queryDesc, 0);
1559 
1560  tupDesc = cstate->queryDesc->tupDesc;
1561  }
1562 
1563  /* Generate or convert list of attributes to process */
1564  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1565 
1566  num_phys_attrs = tupDesc->natts;
1567 
1568  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1569  cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1570  if (cstate->force_quote_all)
1571  {
1572  int i;
1573 
1574  for (i = 0; i < num_phys_attrs; i++)
1575  cstate->force_quote_flags[i] = true;
1576  }
1577  else if (cstate->force_quote)
1578  {
1579  List *attnums;
1580  ListCell *cur;
1581 
1582  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1583 
1584  foreach(cur, attnums)
1585  {
1586  int attnum = lfirst_int(cur);
1587  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1588 
1589  if (!list_member_int(cstate->attnumlist, attnum))
1590  ereport(ERROR,
1591  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1592  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1593  NameStr(attr->attname))));
1594  cstate->force_quote_flags[attnum - 1] = true;
1595  }
1596  }
1597 
1598  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1599  cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1600  if (cstate->force_notnull)
1601  {
1602  List *attnums;
1603  ListCell *cur;
1604 
1605  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1606 
1607  foreach(cur, attnums)
1608  {
1609  int attnum = lfirst_int(cur);
1610  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1611 
1612  if (!list_member_int(cstate->attnumlist, attnum))
1613  ereport(ERROR,
1614  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1615  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1616  NameStr(attr->attname))));
1617  cstate->force_notnull_flags[attnum - 1] = true;
1618  }
1619  }
1620 
1621  /* Convert FORCE_NULL name list to per-column flags, check validity */
1622  cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1623  if (cstate->force_null)
1624  {
1625  List *attnums;
1626  ListCell *cur;
1627 
1628  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1629 
1630  foreach(cur, attnums)
1631  {
1632  int attnum = lfirst_int(cur);
1633  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1634 
1635  if (!list_member_int(cstate->attnumlist, attnum))
1636  ereport(ERROR,
1637  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1638  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1639  NameStr(attr->attname))));
1640  cstate->force_null_flags[attnum - 1] = true;
1641  }
1642  }
1643 
1644  /* Convert convert_selectively name list to per-column flags */
1645  if (cstate->convert_selectively)
1646  {
1647  List *attnums;
1648  ListCell *cur;
1649 
1650  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1651 
1652  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1653 
1654  foreach(cur, attnums)
1655  {
1656  int attnum = lfirst_int(cur);
1657  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1658 
1659  if (!list_member_int(cstate->attnumlist, attnum))
1660  ereport(ERROR,
1661  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1662  errmsg_internal("selected column \"%s\" not referenced by COPY",
1663  NameStr(attr->attname))));
1664  cstate->convert_select_flags[attnum - 1] = true;
1665  }
1666  }
1667 
1668  /* Use client encoding when ENCODING option is not specified. */
1669  if (cstate->file_encoding < 0)
1671 
1672  /*
1673  * Set up encoding conversion info. Even if the file and server encodings
1674  * are the same, we must apply pg_any_to_server() to validate data in
1675  * multibyte encodings.
1676  */
1677  cstate->need_transcoding =
1678  (cstate->file_encoding != GetDatabaseEncoding() ||
1680  /* See Multibyte encoding comment above */
1682 
1683  cstate->copy_dest = COPY_FILE; /* default */
1684 
1685  MemoryContextSwitchTo(oldcontext);
1686 
1687  return cstate;
1688 }
1689 
1690 /*
1691  * Closes the pipe to an external program, checking the pclose() return code.
1692  */
1693 static void
1695 {
1696  int pclose_rc;
1697 
1698  Assert(cstate->is_program);
1699 
1700  pclose_rc = ClosePipeStream(cstate->copy_file);
1701  if (pclose_rc == -1)
1702  ereport(ERROR,
1704  errmsg("could not close pipe to external command: %m")));
1705  else if (pclose_rc != 0)
1706  ereport(ERROR,
1707  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1708  errmsg("program \"%s\" failed",
1709  cstate->filename),
1710  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1711 }
1712 
1713 /*
1714  * Release resources allocated in a cstate for COPY TO/FROM.
1715  */
1716 static void
1718 {
1719  if (cstate->is_program)
1720  {
1721  ClosePipeToProgram(cstate);
1722  }
1723  else
1724  {
1725  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1726  ereport(ERROR,
1728  errmsg("could not close file \"%s\": %m",
1729  cstate->filename)));
1730  }
1731 
1733  pfree(cstate);
1734 }
1735 
1736 /*
1737  * Setup CopyState to read tuples from a table or a query for COPY TO.
1738  */
1739 static CopyState
1741  Relation rel,
1742  RawStmt *query,
1743  Oid queryRelId,
1744  const char *filename,
1745  bool is_program,
1746  List *attnamelist,
1747  List *options)
1748 {
1749  CopyState cstate;
1750  bool pipe = (filename == NULL);
1751  MemoryContext oldcontext;
1752 
1753  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1754  {
1755  if (rel->rd_rel->relkind == RELKIND_VIEW)
1756  ereport(ERROR,
1757  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1758  errmsg("cannot copy from view \"%s\"",
1760  errhint("Try the COPY (SELECT ...) TO variant.")));
1761  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1762  ereport(ERROR,
1763  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1764  errmsg("cannot copy from materialized view \"%s\"",
1766  errhint("Try the COPY (SELECT ...) TO variant.")));
1767  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1768  ereport(ERROR,
1769  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1770  errmsg("cannot copy from foreign table \"%s\"",
1772  errhint("Try the COPY (SELECT ...) TO variant.")));
1773  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1774  ereport(ERROR,
1775  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1776  errmsg("cannot copy from sequence \"%s\"",
1777  RelationGetRelationName(rel))));
1778  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1779  ereport(ERROR,
1780  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1781  errmsg("cannot copy from partitioned table \"%s\"",
1783  errhint("Try the COPY (SELECT ...) TO variant.")));
1784  else
1785  ereport(ERROR,
1786  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1787  errmsg("cannot copy from non-table relation \"%s\"",
1788  RelationGetRelationName(rel))));
1789  }
1790 
1791  cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1792  options);
1793  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1794 
1795  if (pipe)
1796  {
1797  Assert(!is_program); /* the grammar does not allow this */
1799  cstate->copy_file = stdout;
1800  }
1801  else
1802  {
1803  cstate->filename = pstrdup(filename);
1804  cstate->is_program = is_program;
1805 
1806  if (is_program)
1807  {
1808  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1809  if (cstate->copy_file == NULL)
1810  ereport(ERROR,
1812  errmsg("could not execute command \"%s\": %m",
1813  cstate->filename)));
1814  }
1815  else
1816  {
1817  mode_t oumask; /* Pre-existing umask value */
1818  struct stat st;
1819 
1820  /*
1821  * Prevent write to relative path ... too easy to shoot oneself in
1822  * the foot by overwriting a database file ...
1823  */
1824  if (!is_absolute_path(filename))
1825  ereport(ERROR,
1826  (errcode(ERRCODE_INVALID_NAME),
1827  errmsg("relative path not allowed for COPY to file")));
1828 
1829  oumask = umask(S_IWGRP | S_IWOTH);
1830  PG_TRY();
1831  {
1832  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1833  }
1834  PG_CATCH();
1835  {
1836  umask(oumask);
1837  PG_RE_THROW();
1838  }
1839  PG_END_TRY();
1840  umask(oumask);
1841  if (cstate->copy_file == NULL)
1842  {
1843  /* copy errno because ereport subfunctions might change it */
1844  int save_errno = errno;
1845 
1846  ereport(ERROR,
1848  errmsg("could not open file \"%s\" for writing: %m",
1849  cstate->filename),
1850  (save_errno == ENOENT || save_errno == EACCES) ?
1851  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1852  "You may want a client-side facility such as psql's \\copy.") : 0));
1853  }
1854 
1855  if (fstat(fileno(cstate->copy_file), &st))
1856  ereport(ERROR,
1858  errmsg("could not stat file \"%s\": %m",
1859  cstate->filename)));
1860 
1861  if (S_ISDIR(st.st_mode))
1862  ereport(ERROR,
1863  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1864  errmsg("\"%s\" is a directory", cstate->filename)));
1865  }
1866  }
1867 
1868  MemoryContextSwitchTo(oldcontext);
1869 
1870  return cstate;
1871 }
1872 
1873 /*
1874  * This intermediate routine exists mainly to localize the effects of setjmp
1875  * so we don't need to plaster a lot of variables with "volatile".
1876  */
1877 static uint64
1879 {
1880  bool pipe = (cstate->filename == NULL);
1881  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1882  uint64 processed;
1883 
1884  PG_TRY();
1885  {
1886  if (fe_copy)
1887  SendCopyBegin(cstate);
1888 
1889  processed = CopyTo(cstate);
1890 
1891  if (fe_copy)
1892  SendCopyEnd(cstate);
1893  }
1894  PG_CATCH();
1895  {
1896  /*
1897  * Make sure we turn off old-style COPY OUT mode upon error. It is
1898  * okay to do this in all cases, since it does nothing if the mode is
1899  * not on.
1900  */
1901  pq_endcopyout(true);
1902  PG_RE_THROW();
1903  }
1904  PG_END_TRY();
1905 
1906  return processed;
1907 }
1908 
1909 /*
1910  * Clean up storage and release resources for COPY TO.
1911  */
1912 static void
1914 {
1915  if (cstate->queryDesc != NULL)
1916  {
1917  /* Close down the query and free resources. */
1918  ExecutorFinish(cstate->queryDesc);
1919  ExecutorEnd(cstate->queryDesc);
1920  FreeQueryDesc(cstate->queryDesc);
1922  }
1923 
1924  /* Clean up storage */
1925  EndCopy(cstate);
1926 }
1927 
1928 /*
1929  * Copy from relation or query TO file.
1930  */
1931 static uint64
1933 {
1934  TupleDesc tupDesc;
1935  int num_phys_attrs;
1936  ListCell *cur;
1937  uint64 processed;
1938 
1939  if (cstate->rel)
1940  tupDesc = RelationGetDescr(cstate->rel);
1941  else
1942  tupDesc = cstate->queryDesc->tupDesc;
1943  num_phys_attrs = tupDesc->natts;
1944  cstate->null_print_client = cstate->null_print; /* default */
1945 
1946  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1947  cstate->fe_msgbuf = makeStringInfo();
1948 
1949  /* Get info about the columns we need to process. */
1950  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1951  foreach(cur, cstate->attnumlist)
1952  {
1953  int attnum = lfirst_int(cur);
1954  Oid out_func_oid;
1955  bool isvarlena;
1956  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1957 
1958  if (cstate->binary)
1959  getTypeBinaryOutputInfo(attr->atttypid,
1960  &out_func_oid,
1961  &isvarlena);
1962  else
1963  getTypeOutputInfo(attr->atttypid,
1964  &out_func_oid,
1965  &isvarlena);
1966  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1967  }
1968 
1969  /*
1970  * Create a temporary memory context that we can reset once per row to
1971  * recover palloc'd memory. This avoids any problems with leaks inside
1972  * datatype output routines, and should be faster than retail pfree's
1973  * anyway. (We don't need a whole econtext as CopyFrom does.)
1974  */
1976  "COPY TO",
1978 
1979  if (cstate->binary)
1980  {
1981  /* Generate header for a binary copy */
1982  int32 tmp;
1983 
1984  /* Signature */
1985  CopySendData(cstate, BinarySignature, 11);
1986  /* Flags field */
1987  tmp = 0;
1988  if (cstate->oids)
1989  tmp |= (1 << 16);
1990  CopySendInt32(cstate, tmp);
1991  /* No header extension */
1992  tmp = 0;
1993  CopySendInt32(cstate, tmp);
1994  }
1995  else
1996  {
1997  /*
1998  * For non-binary copy, we need to convert null_print to file
1999  * encoding, because it will be sent directly with CopySendString.
2000  */
2001  if (cstate->need_transcoding)
2002  cstate->null_print_client = pg_server_to_any(cstate->null_print,
2003  cstate->null_print_len,
2004  cstate->file_encoding);
2005 
2006  /* if a header has been requested send the line */
2007  if (cstate->header_line)
2008  {
2009  bool hdr_delim = false;
2010 
2011  foreach(cur, cstate->attnumlist)
2012  {
2013  int attnum = lfirst_int(cur);
2014  char *colname;
2015 
2016  if (hdr_delim)
2017  CopySendChar(cstate, cstate->delim[0]);
2018  hdr_delim = true;
2019 
2020  colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2021 
2022  CopyAttributeOutCSV(cstate, colname, false,
2023  list_length(cstate->attnumlist) == 1);
2024  }
2025 
2026  CopySendEndOfRow(cstate);
2027  }
2028  }
2029 
2030  if (cstate->rel)
2031  {
2032  Datum *values;
2033  bool *nulls;
2034  HeapScanDesc scandesc;
2035  HeapTuple tuple;
2036 
2037  values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2038  nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2039 
2040  scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2041 
2042  processed = 0;
2043  while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2044  {
2046 
2047  /* Deconstruct the tuple ... faster than repeated heap_getattr */
2048  heap_deform_tuple(tuple, tupDesc, values, nulls);
2049 
2050  /* Format and send the data */
2051  CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2052  processed++;
2053  }
2054 
2055  heap_endscan(scandesc);
2056 
2057  pfree(values);
2058  pfree(nulls);
2059  }
2060  else
2061  {
2062  /* run the plan --- the dest receiver will send tuples */
2063  ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2064  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2065  }
2066 
2067  if (cstate->binary)
2068  {
2069  /* Generate trailer for a binary copy */
2070  CopySendInt16(cstate, -1);
2071  /* Need to flush out the trailer */
2072  CopySendEndOfRow(cstate);
2073  }
2074 
2076 
2077  return processed;
2078 }
2079 
2080 /*
2081  * Emit one row during CopyTo().
2082  */
2083 static void
2084 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2085 {
2086  bool need_delim = false;
2088  MemoryContext oldcontext;
2089  ListCell *cur;
2090  char *string;
2091 
2092  MemoryContextReset(cstate->rowcontext);
2093  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2094 
2095  if (cstate->binary)
2096  {
2097  /* Binary per-tuple header */
2098  CopySendInt16(cstate, list_length(cstate->attnumlist));
2099  /* Send OID if wanted --- note attnumlist doesn't include it */
2100  if (cstate->oids)
2101  {
2102  /* Hack --- assume Oid is same size as int32 */
2103  CopySendInt32(cstate, sizeof(int32));
2104  CopySendInt32(cstate, tupleOid);
2105  }
2106  }
2107  else
2108  {
2109  /* Text format has no per-tuple header, but send OID if wanted */
2110  /* Assume digits don't need any quoting or encoding conversion */
2111  if (cstate->oids)
2112  {
2114  ObjectIdGetDatum(tupleOid)));
2115  CopySendString(cstate, string);
2116  need_delim = true;
2117  }
2118  }
2119 
2120  foreach(cur, cstate->attnumlist)
2121  {
2122  int attnum = lfirst_int(cur);
2123  Datum value = values[attnum - 1];
2124  bool isnull = nulls[attnum - 1];
2125 
2126  if (!cstate->binary)
2127  {
2128  if (need_delim)
2129  CopySendChar(cstate, cstate->delim[0]);
2130  need_delim = true;
2131  }
2132 
2133  if (isnull)
2134  {
2135  if (!cstate->binary)
2136  CopySendString(cstate, cstate->null_print_client);
2137  else
2138  CopySendInt32(cstate, -1);
2139  }
2140  else
2141  {
2142  if (!cstate->binary)
2143  {
2144  string = OutputFunctionCall(&out_functions[attnum - 1],
2145  value);
2146  if (cstate->csv_mode)
2147  CopyAttributeOutCSV(cstate, string,
2148  cstate->force_quote_flags[attnum - 1],
2149  list_length(cstate->attnumlist) == 1);
2150  else
2151  CopyAttributeOutText(cstate, string);
2152  }
2153  else
2154  {
2155  bytea *outputbytes;
2156 
2157  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2158  value);
2159  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2160  CopySendData(cstate, VARDATA(outputbytes),
2161  VARSIZE(outputbytes) - VARHDRSZ);
2162  }
2163  }
2164  }
2165 
2166  CopySendEndOfRow(cstate);
2167 
2168  MemoryContextSwitchTo(oldcontext);
2169 }
2170 
2171 
2172 /*
2173  * error context callback for COPY FROM
2174  *
2175  * The argument for the error context must be CopyState.
2176  */
2177 void
2179 {
2180  CopyState cstate = (CopyState) arg;
2181 
2182  if (cstate->binary)
2183  {
2184  /* can't usefully display the data */
2185  if (cstate->cur_attname)
2186  errcontext("COPY %s, line %d, column %s",
2187  cstate->cur_relname, cstate->cur_lineno,
2188  cstate->cur_attname);
2189  else
2190  errcontext("COPY %s, line %d",
2191  cstate->cur_relname, cstate->cur_lineno);
2192  }
2193  else
2194  {
2195  if (cstate->cur_attname && cstate->cur_attval)
2196  {
2197  /* error is relevant to a particular column */
2198  char *attval;
2199 
2200  attval = limit_printout_length(cstate->cur_attval);
2201  errcontext("COPY %s, line %d, column %s: \"%s\"",
2202  cstate->cur_relname, cstate->cur_lineno,
2203  cstate->cur_attname, attval);
2204  pfree(attval);
2205  }
2206  else if (cstate->cur_attname)
2207  {
2208  /* error is relevant to a particular column, value is NULL */
2209  errcontext("COPY %s, line %d, column %s: null input",
2210  cstate->cur_relname, cstate->cur_lineno,
2211  cstate->cur_attname);
2212  }
2213  else
2214  {
2215  /*
2216  * Error is relevant to a particular line.
2217  *
2218  * If line_buf still contains the correct line, and it's already
2219  * transcoded, print it. If it's still in a foreign encoding, it's
2220  * quite likely that the error is precisely a failure to do
2221  * encoding conversion (ie, bad data). We dare not try to convert
2222  * it, and at present there's no way to regurgitate it without
2223  * conversion. So we have to punt and just report the line number.
2224  */
2225  if (cstate->line_buf_valid &&
2226  (cstate->line_buf_converted || !cstate->need_transcoding))
2227  {
2228  char *lineval;
2229 
2230  lineval = limit_printout_length(cstate->line_buf.data);
2231  errcontext("COPY %s, line %d: \"%s\"",
2232  cstate->cur_relname, cstate->cur_lineno, lineval);
2233  pfree(lineval);
2234  }
2235  else
2236  {
2237  errcontext("COPY %s, line %d",
2238  cstate->cur_relname, cstate->cur_lineno);
2239  }
2240  }
2241  }
2242 }
2243 
2244 /*
2245  * Make sure we don't print an unreasonable amount of COPY data in a message.
2246  *
2247  * It would seem a lot easier to just use the sprintf "precision" limit to
2248  * truncate the string. However, some versions of glibc have a bug/misfeature
2249  * that vsnprintf will always fail (return -1) if it is asked to truncate
2250  * a string that contains invalid byte sequences for the current encoding.
2251  * So, do our own truncation. We return a pstrdup'd copy of the input.
2252  */
2253 static char *
2255 {
2256 #define MAX_COPY_DATA_DISPLAY 100
2257 
2258  int slen = strlen(str);
2259  int len;
2260  char *res;
2261 
2262  /* Fast path if definitely okay */
2263  if (slen <= MAX_COPY_DATA_DISPLAY)
2264  return pstrdup(str);
2265 
2266  /* Apply encoding-dependent truncation */
2267  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2268 
2269  /*
2270  * Truncate, and add "..." to show we truncated the input.
2271  */
2272  res = (char *) palloc(len + 4);
2273  memcpy(res, str, len);
2274  strcpy(res + len, "...");
2275 
2276  return res;
2277 }
2278 
2279 /*
2280  * Copy FROM file to relation.
2281  */
2282 uint64
2284 {
2285  HeapTuple tuple;
2286  TupleDesc tupDesc;
2287  Datum *values;
2288  bool *nulls;
2289  ResultRelInfo *resultRelInfo;
2290  ResultRelInfo *saved_resultRelInfo = NULL;
2291  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2292  ExprContext *econtext;
2293  TupleTableSlot *myslot;
2294  MemoryContext oldcontext = CurrentMemoryContext;
2295 
2296  ErrorContextCallback errcallback;
2297  CommandId mycid = GetCurrentCommandId(true);
2298  int hi_options = 0; /* start with default heap_insert options */
2299  BulkInsertState bistate;
2300  uint64 processed = 0;
2301  bool useHeapMultiInsert;
2302  int nBufferedTuples = 0;
2303  int prev_leaf_part_index = -1;
2304 
2305 #define MAX_BUFFERED_TUPLES 1000
2306  HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2307  Size bufferedTuplesSize = 0;
2308  int firstBufferedLineNo = 0;
2309 
2310  Assert(cstate->rel);
2311 
2312  /*
2313  * The target must be a plain relation or have an INSTEAD OF INSERT row
2314  * trigger. (Currently, such triggers are only allowed on views, so we
2315  * only hint about them in the view case.)
2316  */
2317  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2318  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2319  !(cstate->rel->trigdesc &&
2321  {
2322  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2323  ereport(ERROR,
2324  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2325  errmsg("cannot copy to view \"%s\"",
2326  RelationGetRelationName(cstate->rel)),
2327  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2328  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2329  ereport(ERROR,
2330  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2331  errmsg("cannot copy to materialized view \"%s\"",
2332  RelationGetRelationName(cstate->rel))));
2333  else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2334  ereport(ERROR,
2335  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2336  errmsg("cannot copy to foreign table \"%s\"",
2337  RelationGetRelationName(cstate->rel))));
2338  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2339  ereport(ERROR,
2340  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2341  errmsg("cannot copy to sequence \"%s\"",
2342  RelationGetRelationName(cstate->rel))));
2343  else
2344  ereport(ERROR,
2345  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2346  errmsg("cannot copy to non-table relation \"%s\"",
2347  RelationGetRelationName(cstate->rel))));
2348  }
2349 
2350  tupDesc = RelationGetDescr(cstate->rel);
2351 
2352  /*----------
2353  * Check to see if we can avoid writing WAL
2354  *
2355  * If archive logging/streaming is not enabled *and* either
2356  * - table was created in same transaction as this COPY
2357  * - data is being written to relfilenode created in this transaction
2358  * then we can skip writing WAL. It's safe because if the transaction
2359  * doesn't commit, we'll discard the table (or the new relfilenode file).
2360  * If it does commit, we'll have done the heap_sync at the bottom of this
2361  * routine first.
2362  *
2363  * As mentioned in comments in utils/rel.h, the in-same-transaction test
2364  * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2365  * can be cleared before the end of the transaction. The exact case is
2366  * when a relation sets a new relfilenode twice in same transaction, yet
2367  * the second one fails in an aborted subtransaction, e.g.
2368  *
2369  * BEGIN;
2370  * TRUNCATE t;
2371  * SAVEPOINT save;
2372  * TRUNCATE t;
2373  * ROLLBACK TO save;
2374  * COPY ...
2375  *
2376  * Also, if the target file is new-in-transaction, we assume that checking
2377  * FSM for free space is a waste of time, even if we must use WAL because
2378  * of archiving. This could possibly be wrong, but it's unlikely.
2379  *
2380  * The comments for heap_insert and RelationGetBufferForTuple specify that
2381  * skipping WAL logging is only safe if we ensure that our tuples do not
2382  * go into pages containing tuples from any other transactions --- but this
2383  * must be the case if we have a new table or new relfilenode, so we need
2384  * no additional work to enforce that.
2385  *----------
2386  */
2387  /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2388  if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2390  {
2391  hi_options |= HEAP_INSERT_SKIP_FSM;
2392  if (!XLogIsNeeded())
2393  hi_options |= HEAP_INSERT_SKIP_WAL;
2394  }
2395 
2396  /*
2397  * Optimize if new relfilenode was created in this subxact or one of its
2398  * committed children and we won't see those rows later as part of an
2399  * earlier scan or command. The subxact test ensures that if this subxact
2400  * aborts then the frozen rows won't be visible after xact cleanup. Note
2401  * that the stronger test of exactly which subtransaction created it is
2402  * crucial for correctness of this optimization. The test for an earlier
2403  * scan or command tolerates false negatives. FREEZE causes other sessions
2404  * to see rows they would not see under MVCC, and a false negative merely
2405  * spreads that anomaly to the current session.
2406  */
2407  if (cstate->freeze)
2408  {
2409  /*
2410  * Tolerate one registration for the benefit of FirstXactSnapshot.
2411  * Scan-bearing queries generally create at least two registrations,
2412  * though relying on that is fragile, as is ignoring ActiveSnapshot.
2413  * Clear CatalogSnapshot to avoid counting its registration. We'll
2414  * still detect ongoing catalog scans, each of which separately
2415  * registers the snapshot it uses.
2416  */
2419  ereport(ERROR,
2420  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2421  errmsg("cannot perform FREEZE because of prior transaction activity")));
2422 
2423  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2425  ereport(ERROR,
2426  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2427  errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2428 
2429  hi_options |= HEAP_INSERT_FROZEN;
2430  }
2431 
2432  /*
2433  * We need a ResultRelInfo so we can use the regular executor's
2434  * index-entry-making machinery. (There used to be a huge amount of code
2435  * here that basically duplicated execUtils.c ...)
2436  */
2437  resultRelInfo = makeNode(ResultRelInfo);
2438  InitResultRelInfo(resultRelInfo,
2439  cstate->rel,
2440  1, /* dummy rangetable index */
2441  NULL,
2442  0);
2443 
2444  ExecOpenIndices(resultRelInfo, false);
2445 
2446  estate->es_result_relations = resultRelInfo;
2447  estate->es_num_result_relations = 1;
2448  estate->es_result_relation_info = resultRelInfo;
2449  estate->es_range_table = cstate->range_table;
2450 
2451  /* Set up a tuple slot too */
2452  myslot = ExecInitExtraTupleSlot(estate);
2453  ExecSetSlotDescriptor(myslot, tupDesc);
2454  /* Triggers might need a slot as well */
2455  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2456 
2457  /* Prepare to catch AFTER triggers. */
2459 
2460  /*
2461  * If there are any triggers with transition tables on the named relation,
2462  * we need to be prepared to capture transition tuples.
2463  */
2464  cstate->transition_capture =
2466  RelationGetRelid(cstate->rel),
2467  CMD_INSERT);
2468 
2469  /*
2470  * If the named relation is a partitioned table, initialize state for
2471  * CopyFrom tuple routing.
2472  */
2473  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2474  {
2479  int num_parted,
2481 
2483  cstate->rel,
2484  1,
2485  estate,
2486  &partition_dispatch_info,
2487  &partitions,
2488  &partition_tupconv_maps,
2489  &partition_tuple_slot,
2490  &num_parted, &num_partitions);
2492  cstate->num_dispatch = num_parted;
2493  cstate->partitions = partitions;
2494  cstate->num_partitions = num_partitions;
2497 
2498  /*
2499  * If we are capturing transition tuples, they may need to be
2500  * converted from partition format back to partitioned table format
2501  * (this is only ever necessary if a BEFORE trigger modifies the
2502  * tuple).
2503  */
2504  if (cstate->transition_capture != NULL)
2505  {
2506  int i;
2507 
2509  palloc0(sizeof(TupleConversionMap *) * cstate->num_partitions);
2510  for (i = 0; i < cstate->num_partitions; ++i)
2511  {
2512  cstate->transition_tupconv_maps[i] =
2514  RelationGetDescr(cstate->rel),
2515  gettext_noop("could not convert row type"));
2516  }
2517  }
2518  }
2519 
2520  /*
2521  * It's more efficient to prepare a bunch of tuples for insertion, and
2522  * insert them in one heap_multi_insert() call, than call heap_insert()
2523  * separately for every tuple. However, we can't do that if there are
2524  * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2525  * expressions. Such triggers or expressions might query the table we're
2526  * inserting to, and act differently if the tuples that have already been
2527  * processed and prepared for insertion are not there. We also can't do
2528  * it if the table is partitioned.
2529  */
2530  if ((resultRelInfo->ri_TrigDesc != NULL &&
2531  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2532  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2533  cstate->partition_dispatch_info != NULL ||
2534  cstate->volatile_defexprs)
2535  {
2536  useHeapMultiInsert = false;
2537  }
2538  else
2539  {
2540  useHeapMultiInsert = true;
2541  bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2542  }
2543 
2544  /*
2545  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2546  * should do this for COPY, since it's not really an "INSERT" statement as
2547  * such. However, executing these triggers maintains consistency with the
2548  * EACH ROW triggers that we already fire on COPY.
2549  */
2550  ExecBSInsertTriggers(estate, resultRelInfo);
2551 
2552  values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2553  nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2554 
2555  bistate = GetBulkInsertState();
2556  econtext = GetPerTupleExprContext(estate);
2557 
2558  /* Set up callback to identify error line number */
2559  errcallback.callback = CopyFromErrorCallback;
2560  errcallback.arg = (void *) cstate;
2561  errcallback.previous = error_context_stack;
2562  error_context_stack = &errcallback;
2563 
2564  for (;;)
2565  {
2566  TupleTableSlot *slot;
2567  bool skip_tuple;
2568  Oid loaded_oid = InvalidOid;
2569 
2571 
2572  if (nBufferedTuples == 0)
2573  {
2574  /*
2575  * Reset the per-tuple exprcontext. We can only do this if the
2576  * tuple buffer is empty. (Calling the context the per-tuple
2577  * memory context is a bit of a misnomer now.)
2578  */
2579  ResetPerTupleExprContext(estate);
2580  }
2581 
2582  /* Switch into its memory context */
2584 
2585  if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2586  break;
2587 
2588  /* And now we can form the input tuple. */
2589  tuple = heap_form_tuple(tupDesc, values, nulls);
2590 
2591  if (loaded_oid != InvalidOid)
2592  HeapTupleSetOid(tuple, loaded_oid);
2593 
2594  /*
2595  * Constraints might reference the tableoid column, so initialize
2596  * t_tableOid before evaluating them.
2597  */
2598  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2599 
2600  /* Triggers and stuff need to be invoked in query context. */
2601  MemoryContextSwitchTo(oldcontext);
2602 
2603  /* Place tuple in tuple slot --- but slot shouldn't free it */
2604  slot = myslot;
2605  ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2606 
2607  /* Determine the partition to heap_insert the tuple into */
2608  if (cstate->partition_dispatch_info)
2609  {
2610  int leaf_part_index;
2611  TupleConversionMap *map;
2612 
2613  /*
2614  * Away we go ... If we end up not finding a partition after all,
2615  * ExecFindPartition() does not return and errors out instead.
2616  * Otherwise, the returned value is to be used as an index into
2617  * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2618  * will get us the ResultRelInfo and TupleConversionMap for the
2619  * partition, respectively.
2620  */
2621  leaf_part_index = ExecFindPartition(resultRelInfo,
2622  cstate->partition_dispatch_info,
2623  slot,
2624  estate);
2625  Assert(leaf_part_index >= 0 &&
2626  leaf_part_index < cstate->num_partitions);
2627 
2628  /*
2629  * If this tuple is mapped to a partition that is not same as the
2630  * previous one, we'd better make the bulk insert mechanism gets a
2631  * new buffer.
2632  */
2633  if (prev_leaf_part_index != leaf_part_index)
2634  {
2635  ReleaseBulkInsertStatePin(bistate);
2636  prev_leaf_part_index = leaf_part_index;
2637  }
2638 
2639  /*
2640  * Save the old ResultRelInfo and switch to the one corresponding
2641  * to the selected partition.
2642  */
2643  saved_resultRelInfo = resultRelInfo;
2644  resultRelInfo = cstate->partitions[leaf_part_index];
2645 
2646  /* We do not yet have a way to insert into a foreign partition */
2647  if (resultRelInfo->ri_FdwRoutine)
2648  ereport(ERROR,
2649  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2650  errmsg("cannot route inserted tuples to a foreign table")));
2651 
2652  /*
2653  * For ExecInsertIndexTuples() to work on the partition's indexes
2654  */
2655  estate->es_result_relation_info = resultRelInfo;
2656 
2657  /*
2658  * If we're capturing transition tuples, we might need to convert
2659  * from the partition rowtype to parent rowtype.
2660  */
2661  if (cstate->transition_capture != NULL)
2662  {
2663  if (resultRelInfo->ri_TrigDesc &&
2664  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2665  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2666  {
2667  /*
2668  * If there are any BEFORE or INSTEAD triggers on the
2669  * partition, we'll have to be ready to convert their
2670  * result back to tuplestore format.
2671  */
2673  cstate->transition_capture->tcs_map =
2674  cstate->transition_tupconv_maps[leaf_part_index];
2675  }
2676  else
2677  {
2678  /*
2679  * Otherwise, just remember the original unconverted
2680  * tuple, to avoid a needless round trip conversion.
2681  */
2683  cstate->transition_capture->tcs_map = NULL;
2684  }
2685  }
2686 
2687  /*
2688  * We might need to convert from the parent rowtype to the
2689  * partition rowtype.
2690  */
2691  map = cstate->partition_tupconv_maps[leaf_part_index];
2692  if (map)
2693  {
2694  Relation partrel = resultRelInfo->ri_RelationDesc;
2695 
2696  tuple = do_convert_tuple(tuple, map);
2697 
2698  /*
2699  * We must use the partition's tuple descriptor from this
2700  * point on. Use a dedicated slot from this point on until
2701  * we're finished dealing with the partition.
2702  */
2703  slot = cstate->partition_tuple_slot;
2704  Assert(slot != NULL);
2705  ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2706  ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2707  }
2708 
2709  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2710  }
2711 
2712  skip_tuple = false;
2713 
2714  /* BEFORE ROW INSERT Triggers */
2715  if (resultRelInfo->ri_TrigDesc &&
2716  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2717  {
2718  slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2719 
2720  if (slot == NULL) /* "do nothing" */
2721  skip_tuple = true;
2722  else /* trigger might have changed tuple */
2723  tuple = ExecMaterializeSlot(slot);
2724  }
2725 
2726  if (!skip_tuple)
2727  {
2728  if (resultRelInfo->ri_TrigDesc &&
2729  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2730  {
2731  /* Pass the data to the INSTEAD ROW INSERT trigger */
2732  ExecIRInsertTriggers(estate, resultRelInfo, slot);
2733  }
2734  else
2735  {
2736  /*
2737  * We always check the partition constraint, including when
2738  * the tuple got here via tuple-routing. However we don't
2739  * need to in the latter case if no BR trigger is defined on
2740  * the partition. Note that a BR trigger might modify the
2741  * tuple such that the partition constraint is no longer
2742  * satisfied, so we need to check in that case.
2743  */
2744  bool check_partition_constr =
2745  (resultRelInfo->ri_PartitionCheck != NIL);
2746 
2747  if (saved_resultRelInfo != NULL &&
2748  !(resultRelInfo->ri_TrigDesc &&
2749  resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2750  check_partition_constr = false;
2751 
2752  /* Check the constraints of the tuple */
2753  if (cstate->rel->rd_att->constr || check_partition_constr)
2754  ExecConstraints(resultRelInfo, slot, estate);
2755 
2756  if (useHeapMultiInsert)
2757  {
2758  /* Add this tuple to the tuple buffer */
2759  if (nBufferedTuples == 0)
2760  firstBufferedLineNo = cstate->cur_lineno;
2761  bufferedTuples[nBufferedTuples++] = tuple;
2762  bufferedTuplesSize += tuple->t_len;
2763 
2764  /*
2765  * If the buffer filled up, flush it. Also flush if the
2766  * total size of all the tuples in the buffer becomes
2767  * large, to avoid using large amounts of memory for the
2768  * buffer when the tuples are exceptionally wide.
2769  */
2770  if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2771  bufferedTuplesSize > 65535)
2772  {
2773  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2774  resultRelInfo, myslot, bistate,
2775  nBufferedTuples, bufferedTuples,
2776  firstBufferedLineNo);
2777  nBufferedTuples = 0;
2778  bufferedTuplesSize = 0;
2779  }
2780  }
2781  else
2782  {
2783  List *recheckIndexes = NIL;
2784 
2785  /* OK, store the tuple and create index entries for it */
2786  heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2787  hi_options, bistate);
2788 
2789  if (resultRelInfo->ri_NumIndices > 0)
2790  recheckIndexes = ExecInsertIndexTuples(slot,
2791  &(tuple->t_self),
2792  estate,
2793  false,
2794  NULL,
2795  NIL);
2796 
2797  /* AFTER ROW INSERT Triggers */
2798  ExecARInsertTriggers(estate, resultRelInfo, tuple,
2799  recheckIndexes, cstate->transition_capture);
2800 
2801  list_free(recheckIndexes);
2802  }
2803  }
2804 
2805  /*
2806  * We count only tuples not suppressed by a BEFORE INSERT trigger;
2807  * this is the same definition used by execMain.c for counting
2808  * tuples inserted by an INSERT command.
2809  */
2810  processed++;
2811 
2812  if (saved_resultRelInfo)
2813  {
2814  resultRelInfo = saved_resultRelInfo;
2815  estate->es_result_relation_info = resultRelInfo;
2816  }
2817  }
2818  }
2819 
2820  /* Flush any remaining buffered tuples */
2821  if (nBufferedTuples > 0)
2822  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2823  resultRelInfo, myslot, bistate,
2824  nBufferedTuples, bufferedTuples,
2825  firstBufferedLineNo);
2826 
2827  /* Done, clean up */
2828  error_context_stack = errcallback.previous;
2829 
2830  FreeBulkInsertState(bistate);
2831 
2832  MemoryContextSwitchTo(oldcontext);
2833 
2834  /*
2835  * In the old protocol, tell pqcomm that we can process normal protocol
2836  * messages again.
2837  */
2838  if (cstate->copy_dest == COPY_OLD_FE)
2839  pq_endmsgread();
2840 
2841  /* Execute AFTER STATEMENT insertion triggers */
2842  ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2843 
2844  /* Handle queued AFTER triggers */
2845  AfterTriggerEndQuery(estate);
2846 
2847  pfree(values);
2848  pfree(nulls);
2849 
2850  ExecResetTupleTable(estate->es_tupleTable, false);
2851 
2852  ExecCloseIndices(resultRelInfo);
2853 
2854  /* Close all the partitioned tables, leaf partitions, and their indices */
2855  if (cstate->partition_dispatch_info)
2856  {
2857  int i;
2858 
2859  /*
2860  * Remember cstate->partition_dispatch_info[0] corresponds to the root
2861  * partitioned table, which we must not try to close, because it is
2862  * the main target table of COPY that will be closed eventually by
2863  * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2864  */
2865  for (i = 1; i < cstate->num_dispatch; i++)
2866  {
2868 
2869  heap_close(pd->reldesc, NoLock);
2871  }
2872  for (i = 0; i < cstate->num_partitions; i++)
2873  {
2874  ResultRelInfo *resultRelInfo = cstate->partitions[i];
2875 
2876  ExecCloseIndices(resultRelInfo);
2877  heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2878  }
2879 
2880  /* Release the standalone partition tuple descriptor */
2882  }
2883 
2884  /* Close any trigger target relations */
2885  ExecCleanUpTriggerState(estate);
2886 
2887  FreeExecutorState(estate);
2888 
2889  /*
2890  * If we skipped writing WAL, then we need to sync the heap (but not
2891  * indexes since those use WAL anyway)
2892  */
2893  if (hi_options & HEAP_INSERT_SKIP_WAL)
2894  heap_sync(cstate->rel);
2895 
2896  return processed;
2897 }
2898 
2899 /*
2900  * A subroutine of CopyFrom, to write the current batch of buffered heap
2901  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2902  * triggers.
2903  */
2904 static void
2906  int hi_options, ResultRelInfo *resultRelInfo,
2907  TupleTableSlot *myslot, BulkInsertState bistate,
2908  int nBufferedTuples, HeapTuple *bufferedTuples,
2909  int firstBufferedLineNo)
2910 {
2911  MemoryContext oldcontext;
2912  int i;
2913  int save_cur_lineno;
2914 
2915  /*
2916  * Print error context information correctly, if one of the operations
2917  * below fail.
2918  */
2919  cstate->line_buf_valid = false;
2920  save_cur_lineno = cstate->cur_lineno;
2921 
2922  /*
2923  * heap_multi_insert leaks memory, so switch to short-lived memory context
2924  * before calling it.
2925  */
2926  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2927  heap_multi_insert(cstate->rel,
2928  bufferedTuples,
2929  nBufferedTuples,
2930  mycid,
2931  hi_options,
2932  bistate);
2933  MemoryContextSwitchTo(oldcontext);
2934 
2935  /*
2936  * If there are any indexes, update them for all the inserted tuples, and
2937  * run AFTER ROW INSERT triggers.
2938  */
2939  if (resultRelInfo->ri_NumIndices > 0)
2940  {
2941  for (i = 0; i < nBufferedTuples; i++)
2942  {
2943  List *recheckIndexes;
2944 
2945  cstate->cur_lineno = firstBufferedLineNo + i;
2946  ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2947  recheckIndexes =
2948  ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2949  estate, false, NULL, NIL);
2950  ExecARInsertTriggers(estate, resultRelInfo,
2951  bufferedTuples[i],
2952  recheckIndexes, cstate->transition_capture);
2953  list_free(recheckIndexes);
2954  }
2955  }
2956 
2957  /*
2958  * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2959  * anyway.
2960  */
2961  else if (resultRelInfo->ri_TrigDesc != NULL &&
2962  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2963  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2964  {
2965  for (i = 0; i < nBufferedTuples; i++)
2966  {
2967  cstate->cur_lineno = firstBufferedLineNo + i;
2968  ExecARInsertTriggers(estate, resultRelInfo,
2969  bufferedTuples[i],
2970  NIL, cstate->transition_capture);
2971  }
2972  }
2973 
2974  /* reset cur_lineno to where we were */
2975  cstate->cur_lineno = save_cur_lineno;
2976 }
2977 
2978 /*
2979  * Setup to read tuples from a file for COPY FROM.
2980  *
2981  * 'rel': Used as a template for the tuples
2982  * 'filename': Name of server-local file to read
2983  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2984  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2985  *
2986  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2987  */
2988 CopyState
2990  Relation rel,
2991  const char *filename,
2992  bool is_program,
2994  List *attnamelist,
2995  List *options)
2996 {
2997  CopyState cstate;
2998  bool pipe = (filename == NULL);
2999  TupleDesc tupDesc;
3000  AttrNumber num_phys_attrs,
3001  num_defaults;
3003  Oid *typioparams;
3004  int attnum;
3005  Oid in_func_oid;
3006  int *defmap;
3007  ExprState **defexprs;
3008  MemoryContext oldcontext;
3009  bool volatile_defexprs;
3010 
3011  cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3012  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3013 
3014  /* Initialize state variables */
3015  cstate->fe_eof = false;
3016  cstate->eol_type = EOL_UNKNOWN;
3017  cstate->cur_relname = RelationGetRelationName(cstate->rel);
3018  cstate->cur_lineno = 0;
3019  cstate->cur_attname = NULL;
3020  cstate->cur_attval = NULL;
3021 
3022  /* Set up variables to avoid per-attribute overhead. */
3023  initStringInfo(&cstate->attribute_buf);
3024  initStringInfo(&cstate->line_buf);
3025  cstate->line_buf_converted = false;
3026  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3027  cstate->raw_buf_index = cstate->raw_buf_len = 0;
3028 
3029  /* Assign range table, we'll need it in CopyFrom. */
3030  if (pstate)
3031  cstate->range_table = pstate->p_rtable;
3032 
3033  tupDesc = RelationGetDescr(cstate->rel);
3034  num_phys_attrs = tupDesc->natts;
3035  num_defaults = 0;
3036  volatile_defexprs = false;
3037 
3038  /*
3039  * Pick up the required catalog information for each attribute in the
3040  * relation, including the input function, the element type (to pass to
3041  * the input function), and info about defaults and constraints. (Which
3042  * input function we use depends on text/binary format choice.)
3043  */
3044  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3045  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3046  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3047  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3048 
3049  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3050  {
3051  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3052 
3053  /* We don't need info for dropped attributes */
3054  if (att->attisdropped)
3055  continue;
3056 
3057  /* Fetch the input function and typioparam info */
3058  if (cstate->binary)
3059  getTypeBinaryInputInfo(att->atttypid,
3060  &in_func_oid, &typioparams[attnum - 1]);
3061  else
3062  getTypeInputInfo(att->atttypid,
3063  &in_func_oid, &typioparams[attnum - 1]);
3064  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3065 
3066  /* Get default info if needed */
3067  if (!list_member_int(cstate->attnumlist, attnum))
3068  {
3069  /* attribute is NOT to be copied from input */
3070  /* use default value if one exists */
3071  Expr *defexpr;
3072 
3073  if (att->attidentity)
3074  {
3076 
3077  nve->seqid = getOwnedSequence(RelationGetRelid(cstate->rel),
3078  attnum);
3079  nve->typeId = att->atttypid;
3080  defexpr = (Expr *) nve;
3081  }
3082  else
3083  defexpr = (Expr *) build_column_default(cstate->rel, attnum);
3084 
3085  if (defexpr != NULL)
3086  {
3087  /* Run the expression through planner */
3088  defexpr = expression_planner(defexpr);
3089 
3090  /* Initialize executable expression in copycontext */
3091  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3092  defmap[num_defaults] = attnum - 1;
3093  num_defaults++;
3094 
3095  /*
3096  * If a default expression looks at the table being loaded,
3097  * then it could give the wrong answer when using
3098  * multi-insert. Since database access can be dynamic this is
3099  * hard to test for exactly, so we use the much wider test of
3100  * whether the default expression is volatile. We allow for
3101  * the special case of when the default expression is the
3102  * nextval() of a sequence which in this specific case is
3103  * known to be safe for use with the multi-insert
3104  * optimization. Hence we use this special case function
3105  * checker rather than the standard check for
3106  * contain_volatile_functions().
3107  */
3108  if (!volatile_defexprs)
3109  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3110  }
3111  }
3112  }
3113 
3114  /* We keep those variables in cstate. */
3115  cstate->in_functions = in_functions;
3116  cstate->typioparams = typioparams;
3117  cstate->defmap = defmap;
3118  cstate->defexprs = defexprs;
3120  cstate->num_defaults = num_defaults;
3121  cstate->is_program = is_program;
3122 
3123  if (data_source_cb)
3124  {
3125  cstate->copy_dest = COPY_CALLBACK;
3126  cstate->data_source_cb = data_source_cb;
3127  }
3128  else if (pipe)
3129  {
3130  Assert(!is_program); /* the grammar does not allow this */
3132  ReceiveCopyBegin(cstate);
3133  else
3134  cstate->copy_file = stdin;
3135  }
3136  else
3137  {
3138  cstate->filename = pstrdup(filename);
3139 
3140  if (cstate->is_program)
3141  {
3142  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3143  if (cstate->copy_file == NULL)
3144  ereport(ERROR,
3146  errmsg("could not execute command \"%s\": %m",
3147  cstate->filename)));
3148  }
3149  else
3150  {
3151  struct stat st;
3152 
3153  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3154  if (cstate->copy_file == NULL)
3155  {
3156  /* copy errno because ereport subfunctions might change it */
3157  int save_errno = errno;
3158 
3159  ereport(ERROR,
3161  errmsg("could not open file \"%s\" for reading: %m",
3162  cstate->filename),
3163  (save_errno == ENOENT || save_errno == EACCES) ?
3164  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3165  "You may want a client-side facility such as psql's \\copy.") : 0));
3166  }
3167 
3168  if (fstat(fileno(cstate->copy_file), &st))
3169  ereport(ERROR,
3171  errmsg("could not stat file \"%s\": %m",
3172  cstate->filename)));
3173 
3174  if (S_ISDIR(st.st_mode))
3175  ereport(ERROR,
3176  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3177  errmsg("\"%s\" is a directory", cstate->filename)));
3178  }
3179  }
3180 
3181  if (!cstate->binary)
3182  {
3183  /* must rely on user to tell us... */
3184  cstate->file_has_oids = cstate->oids;
3185  }
3186  else
3187  {
3188  /* Read and verify binary header */
3189  char readSig[11];
3190  int32 tmp;
3191 
3192  /* Signature */
3193  if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3194  memcmp(readSig, BinarySignature, 11) != 0)
3195  ereport(ERROR,
3196  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3197  errmsg("COPY file signature not recognized")));
3198  /* Flags field */
3199  if (!CopyGetInt32(cstate, &tmp))
3200  ereport(ERROR,
3201  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3202  errmsg("invalid COPY file header (missing flags)")));
3203  cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3204  tmp &= ~(1 << 16);
3205  if ((tmp >> 16) != 0)
3206  ereport(ERROR,
3207  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3208  errmsg("unrecognized critical flags in COPY file header")));
3209  /* Header extension length */
3210  if (!CopyGetInt32(cstate, &tmp) ||
3211  tmp < 0)
3212  ereport(ERROR,
3213  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3214  errmsg("invalid COPY file header (missing length)")));
3215  /* Skip extension header, if present */
3216  while (tmp-- > 0)
3217  {
3218  if (CopyGetData(cstate, readSig, 1, 1) != 1)
3219  ereport(ERROR,
3220  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3221  errmsg("invalid COPY file header (wrong length)")));
3222  }
3223  }
3224 
3225  if (cstate->file_has_oids && cstate->binary)
3226  {
3228  &in_func_oid, &cstate->oid_typioparam);
3229  fmgr_info(in_func_oid, &cstate->oid_in_function);
3230  }
3231 
3232  /* create workspace for CopyReadAttributes results */
3233  if (!cstate->binary)
3234  {
3235  AttrNumber attr_count = list_length(cstate->attnumlist);
3236  int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3237 
3238  cstate->max_fields = nfields;
3239  cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3240  }
3241 
3242  MemoryContextSwitchTo(oldcontext);
3243 
3244  return cstate;
3245 }
3246 
3247 /*
3248  * Read raw fields in the next line for COPY FROM in text or csv mode.
3249  * Return false if no more lines.
3250  *
3251  * An internal temporary buffer is returned via 'fields'. It is valid until
3252  * the next call of the function. Since the function returns all raw fields
3253  * in the input file, 'nfields' could be different from the number of columns
3254  * in the relation.
3255  *
3256  * NOTE: force_not_null option are not applied to the returned fields.
3257  */
3258 bool
3259 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3260 {
3261  int fldct;
3262  bool done;
3263 
3264  /* only available for text or csv input */
3265  Assert(!cstate->binary);
3266 
3267  /* on input just throw the header line away */
3268  if (cstate->cur_lineno == 0 && cstate->header_line)
3269  {
3270  cstate->cur_lineno++;
3271  if (CopyReadLine(cstate))
3272  return false; /* done */
3273  }
3274 
3275  cstate->cur_lineno++;
3276 
3277  /* Actually read the line into memory here */
3278  done = CopyReadLine(cstate);
3279 
3280  /*
3281  * EOF at start of line means we're done. If we see EOF after some
3282  * characters, we act as though it was newline followed by EOF, ie,
3283  * process the line and then exit loop on next iteration.
3284  */
3285  if (done && cstate->line_buf.len == 0)
3286  return false;
3287 
3288  /* Parse the line into de-escaped field values */
3289  if (cstate->csv_mode)
3290  fldct = CopyReadAttributesCSV(cstate);
3291  else
3292  fldct = CopyReadAttributesText(cstate);
3293 
3294  *fields = cstate->raw_fields;
3295  *nfields = fldct;
3296  return true;
3297 }
3298 
3299 /*
3300  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3301  *
3302  * 'econtext' is used to evaluate default expression for each columns not
3303  * read from the file. It can be NULL when no default values are used, i.e.
3304  * when all columns are read from the file.
3305  *
3306  * 'values' and 'nulls' arrays must be the same length as columns of the
3307  * relation passed to BeginCopyFrom. This function fills the arrays.
3308  * Oid of the tuple is returned with 'tupleOid' separately.
3309  */
3310 bool
3312  Datum *values, bool *nulls, Oid *tupleOid)
3313 {
3314  TupleDesc tupDesc;
3315  AttrNumber num_phys_attrs,
3316  attr_count,
3317  num_defaults = cstate->num_defaults;
3318  FmgrInfo *in_functions = cstate->in_functions;
3319  Oid *typioparams = cstate->typioparams;
3320  int i;
3321  int nfields;
3322  bool isnull;
3323  bool file_has_oids = cstate->file_has_oids;
3324  int *defmap = cstate->defmap;
3325  ExprState **defexprs = cstate->defexprs;
3326 
3327  tupDesc = RelationGetDescr(cstate->rel);
3328  num_phys_attrs = tupDesc->natts;
3329  attr_count = list_length(cstate->attnumlist);
3330  nfields = file_has_oids ? (attr_count + 1) : attr_count;
3331 
3332  /* Initialize all values for row to NULL */
3333  MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3334  MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3335 
3336  if (!cstate->binary)
3337  {
3338  char **field_strings;
3339  ListCell *cur;
3340  int fldct;
3341  int fieldno;
3342  char *string;
3343 
3344  /* read raw fields in the next line */
3345  if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3346  return false;
3347 
3348  /* check for overflowing fields */
3349  if (nfields > 0 && fldct > nfields)
3350  ereport(ERROR,
3351  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3352  errmsg("extra data after last expected column")));
3353 
3354  fieldno = 0;
3355 
3356  /* Read the OID field if present */
3357  if (file_has_oids)
3358  {
3359  if (fieldno >= fldct)
3360  ereport(ERROR,
3361  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3362  errmsg("missing data for OID column")));
3363  string = field_strings[fieldno++];
3364 
3365  if (string == NULL)
3366  ereport(ERROR,
3367  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3368  errmsg("null OID in COPY data")));
3369  else if (cstate->oids && tupleOid != NULL)
3370  {
3371  cstate->cur_attname = "oid";
3372  cstate->cur_attval = string;
3374  CStringGetDatum(string)));
3375  if (*tupleOid == InvalidOid)
3376  ereport(ERROR,
3377  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3378  errmsg("invalid OID in COPY data")));
3379  cstate->cur_attname = NULL;
3380  cstate->cur_attval = NULL;
3381  }
3382  }
3383 
3384  /* Loop to read the user attributes on the line. */
3385  foreach(cur, cstate->attnumlist)
3386  {
3387  int attnum = lfirst_int(cur);
3388  int m = attnum - 1;
3389  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3390 
3391  if (fieldno >= fldct)
3392  ereport(ERROR,
3393  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3394  errmsg("missing data for column \"%s\"",
3395  NameStr(att->attname))));
3396  string = field_strings[fieldno++];
3397 
3398  if (cstate->convert_select_flags &&
3399  !cstate->convert_select_flags[m])
3400  {
3401  /* ignore input field, leaving column as NULL */
3402  continue;
3403  }
3404 
3405  if (cstate->csv_mode)
3406  {
3407  if (string == NULL &&
3408  cstate->force_notnull_flags[m])
3409  {
3410  /*
3411  * FORCE_NOT_NULL option is set and column is NULL -
3412  * convert it to the NULL string.
3413  */
3414  string = cstate->null_print;
3415  }
3416  else if (string != NULL && cstate->force_null_flags[m]
3417  && strcmp(string, cstate->null_print) == 0)
3418  {
3419  /*
3420  * FORCE_NULL option is set and column matches the NULL
3421  * string. It must have been quoted, or otherwise the
3422  * string would already have been set to NULL. Convert it
3423  * to NULL as specified.
3424  */
3425  string = NULL;
3426  }
3427  }
3428 
3429  cstate->cur_attname = NameStr(att->attname);
3430  cstate->cur_attval = string;
3431  values[m] = InputFunctionCall(&in_functions[m],
3432  string,
3433  typioparams[m],
3434  att->atttypmod);
3435  if (string != NULL)
3436  nulls[m] = false;
3437  cstate->cur_attname = NULL;
3438  cstate->cur_attval = NULL;
3439  }
3440 
3441  Assert(fieldno == nfields);
3442  }
3443  else
3444  {
3445  /* binary */
3446  int16 fld_count;
3447  ListCell *cur;
3448 
3449  cstate->cur_lineno++;
3450 
3451  if (!CopyGetInt16(cstate, &fld_count))
3452  {
3453  /* EOF detected (end of file, or protocol-level EOF) */
3454  return false;
3455  }
3456 
3457  if (fld_count == -1)
3458  {
3459  /*
3460  * Received EOF marker. In a V3-protocol copy, wait for the
3461  * protocol-level EOF, and complain if it doesn't come
3462  * immediately. This ensures that we correctly handle CopyFail,
3463  * if client chooses to send that now.
3464  *
3465  * Note that we MUST NOT try to read more data in an old-protocol
3466  * copy, since there is no protocol-level EOF marker then. We
3467  * could go either way for copy from file, but choose to throw
3468  * error if there's data after the EOF marker, for consistency
3469  * with the new-protocol case.
3470  */
3471  char dummy;
3472 
3473  if (cstate->copy_dest != COPY_OLD_FE &&
3474  CopyGetData(cstate, &dummy, 1, 1) > 0)
3475  ereport(ERROR,
3476  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3477  errmsg("received copy data after EOF marker")));
3478  return false;
3479  }
3480 
3481  if (fld_count != attr_count)
3482  ereport(ERROR,
3483  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3484  errmsg("row field count is %d, expected %d",
3485  (int) fld_count, attr_count)));
3486 
3487  if (file_has_oids)
3488  {
3489  Oid loaded_oid;
3490 
3491  cstate->cur_attname = "oid";
3492  loaded_oid =
3494  0,
3495  &cstate->oid_in_function,
3496  cstate->oid_typioparam,
3497  -1,
3498  &isnull));
3499  if (isnull || loaded_oid == InvalidOid)
3500  ereport(ERROR,
3501  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3502  errmsg("invalid OID in COPY data")));
3503  cstate->cur_attname = NULL;
3504  if (cstate->oids && tupleOid != NULL)
3505  *tupleOid = loaded_oid;
3506  }
3507 
3508  i = 0;
3509  foreach(cur, cstate->attnumlist)
3510  {
3511  int attnum = lfirst_int(cur);
3512  int m = attnum - 1;
3513  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3514 
3515  cstate->cur_attname = NameStr(att->attname);
3516  i++;
3517  values[m] = CopyReadBinaryAttribute(cstate,
3518  i,
3519  &in_functions[m],
3520  typioparams[m],
3521  att->atttypmod,
3522  &nulls[m]);
3523  cstate->cur_attname = NULL;
3524  }
3525  }
3526 
3527  /*
3528  * Now compute and insert any defaults available for the columns not
3529  * provided by the input data. Anything not processed here or above will
3530  * remain NULL.
3531  */
3532  for (i = 0; i < num_defaults; i++)
3533  {
3534  /*
3535  * The caller must supply econtext and have switched into the
3536  * per-tuple memory context in it.
3537  */
3538  Assert(econtext != NULL);
3540 
3541  values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3542  &nulls[defmap[i]]);
3543  }
3544 
3545  return true;
3546 }
3547 
3548 /*
3549  * Clean up storage and release resources for COPY FROM.
3550  */
3551 void
3553 {
3554  /* No COPY FROM related resources except memory. */
3555 
3556  EndCopy(cstate);
3557 }
3558 
3559 /*
3560  * Read the next input line and stash it in line_buf, with conversion to
3561  * server encoding.
3562  *
3563  * Result is true if read was terminated by EOF, false if terminated
3564  * by newline. The terminating newline or EOF marker is not included
3565  * in the final value of line_buf.
3566  */
3567 static bool
3569 {
3570  bool result;
3571 
3572  resetStringInfo(&cstate->line_buf);
3573  cstate->line_buf_valid = true;
3574 
3575  /* Mark that encoding conversion hasn't occurred yet */
3576  cstate->line_buf_converted = false;
3577 
3578  /* Parse data and transfer into line_buf */
3579  result = CopyReadLineText(cstate);
3580 
3581  if (result)
3582  {
3583  /*
3584  * Reached EOF. In protocol version 3, we should ignore anything
3585  * after \. up to the protocol end of copy data. (XXX maybe better
3586  * not to treat \. as special?)
3587  */
3588  if (cstate->copy_dest == COPY_NEW_FE)
3589  {
3590  do
3591  {
3592  cstate->raw_buf_index = cstate->raw_buf_len;
3593  } while (CopyLoadRawBuf(cstate));
3594  }
3595  }
3596  else
3597  {
3598  /*
3599  * If we didn't hit EOF, then we must have transferred the EOL marker
3600  * to line_buf along with the data. Get rid of it.
3601  */
3602  switch (cstate->eol_type)
3603  {
3604  case EOL_NL:
3605  Assert(cstate->line_buf.len >= 1);
3606  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3607  cstate->line_buf.len--;
3608  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3609  break;
3610  case EOL_CR:
3611  Assert(cstate->line_buf.len >= 1);
3612  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3613  cstate->line_buf.len--;
3614  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3615  break;
3616  case EOL_CRNL:
3617  Assert(cstate->line_buf.len >= 2);
3618  Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3619  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3620  cstate->line_buf.len -= 2;
3621  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3622  break;
3623  case EOL_UNKNOWN:
3624  /* shouldn't get here */
3625  Assert(false);
3626  break;
3627  }
3628  }
3629 
3630  /* Done reading the line. Convert it to server encoding. */
3631  if (cstate->need_transcoding)
3632  {
3633  char *cvt;
3634 
3635  cvt = pg_any_to_server(cstate->line_buf.data,
3636  cstate->line_buf.len,
3637  cstate->file_encoding);
3638  if (cvt != cstate->line_buf.data)
3639  {
3640  /* transfer converted data back to line_buf */
3641  resetStringInfo(&cstate->line_buf);
3642  appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3643  pfree(cvt);
3644  }
3645  }
3646 
3647  /* Now it's safe to use the buffer in error messages */
3648  cstate->line_buf_converted = true;
3649 
3650  return result;
3651 }
3652 
3653 /*
3654  * CopyReadLineText - inner loop of CopyReadLine for text mode
3655  */
3656 static bool
3658 {
3659  char *copy_raw_buf;
3660  int raw_buf_ptr;
3661  int copy_buf_len;
3662  bool need_data = false;
3663  bool hit_eof = false;
3664  bool result = false;
3665  char mblen_str[2];
3666 
3667  /* CSV variables */
3668  bool first_char_in_line = true;
3669  bool in_quote = false,
3670  last_was_esc = false;
3671  char quotec = '\0';
3672  char escapec = '\0';
3673 
3674  if (cstate->csv_mode)
3675  {
3676  quotec = cstate->quote[0];
3677  escapec = cstate->escape[0];
3678  /* ignore special escape processing if it's the same as quotec */
3679  if (quotec == escapec)
3680  escapec = '\0';
3681  }
3682 
3683  mblen_str[1] = '\0';
3684 
3685  /*
3686  * The objective of this loop is to transfer the entire next input line
3687  * into line_buf. Hence, we only care for detecting newlines (\r and/or
3688  * \n) and the end-of-copy marker (\.).
3689  *
3690  * In CSV mode, \r and \n inside a quoted field are just part of the data
3691  * value and are put in line_buf. We keep just enough state to know if we
3692  * are currently in a quoted field or not.
3693  *
3694  * These four characters, and the CSV escape and quote characters, are
3695  * assumed the same in frontend and backend encodings.
3696  *
3697  * For speed, we try to move data from raw_buf to line_buf in chunks
3698  * rather than one character at a time. raw_buf_ptr points to the next
3699  * character to examine; any characters from raw_buf_index to raw_buf_ptr
3700  * have been determined to be part of the line, but not yet transferred to
3701  * line_buf.
3702  *
3703  * For a little extra speed within the loop, we copy raw_buf and
3704  * raw_buf_len into local variables.
3705  */
3706  copy_raw_buf = cstate->raw_buf;
3707  raw_buf_ptr = cstate->raw_buf_index;
3708  copy_buf_len = cstate->raw_buf_len;
3709 
3710  for (;;)
3711  {
3712  int prev_raw_ptr;
3713  char c;
3714 
3715  /*
3716  * Load more data if needed. Ideally we would just force four bytes
3717  * of read-ahead and avoid the many calls to
3718  * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3719  * does not allow us to read too far ahead or we might read into the
3720  * next data, so we read-ahead only as far we know we can. One
3721  * optimization would be to read-ahead four byte here if
3722  * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3723  * considering the size of the buffer.
3724  */
3725  if (raw_buf_ptr >= copy_buf_len || need_data)
3726  {
3728 
3729  /*
3730  * Try to read some more data. This will certainly reset
3731  * raw_buf_index to zero, and raw_buf_ptr must go with it.
3732  */
3733  if (!CopyLoadRawBuf(cstate))
3734  hit_eof = true;
3735  raw_buf_ptr = 0;
3736  copy_buf_len = cstate->raw_buf_len;
3737 
3738  /*
3739  * If we are completely out of data, break out of the loop,
3740  * reporting EOF.
3741  */
3742  if (copy_buf_len <= 0)
3743  {
3744  result = true;
3745  break;
3746  }
3747  need_data = false;
3748  }
3749 
3750  /* OK to fetch a character */
3751  prev_raw_ptr = raw_buf_ptr;
3752  c = copy_raw_buf[raw_buf_ptr++];
3753 
3754  if (cstate->csv_mode)
3755  {
3756  /*
3757  * If character is '\\' or '\r', we may need to look ahead below.
3758  * Force fetch of the next character if we don't already have it.
3759  * We need to do this before changing CSV state, in case one of
3760  * these characters is also the quote or escape character.
3761  *
3762  * Note: old-protocol does not like forced prefetch, but it's OK
3763  * here since we cannot validly be at EOF.
3764  */
3765  if (c == '\\' || c == '\r')
3766  {
3768  }
3769 
3770  /*
3771  * Dealing with quotes and escapes here is mildly tricky. If the
3772  * quote char is also the escape char, there's no problem - we
3773  * just use the char as a toggle. If they are different, we need
3774  * to ensure that we only take account of an escape inside a
3775  * quoted field and immediately preceding a quote char, and not
3776  * the second in an escape-escape sequence.
3777  */
3778  if (in_quote && c == escapec)
3779  last_was_esc = !last_was_esc;
3780  if (c == quotec && !last_was_esc)
3781  in_quote = !in_quote;
3782  if (c != escapec)
3783  last_was_esc = false;
3784 
3785  /*
3786  * Updating the line count for embedded CR and/or LF chars is
3787  * necessarily a little fragile - this test is probably about the
3788  * best we can do. (XXX it's arguable whether we should do this
3789  * at all --- is cur_lineno a physical or logical count?)
3790  */
3791  if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3792  cstate->cur_lineno++;
3793  }
3794 
3795  /* Process \r */
3796  if (c == '\r' && (!cstate->csv_mode || !in_quote))
3797  {
3798  /* Check for \r\n on first line, _and_ handle \r\n. */
3799  if (cstate->eol_type == EOL_UNKNOWN ||
3800  cstate->eol_type == EOL_CRNL)
3801  {
3802  /*
3803  * If need more data, go back to loop top to load it.
3804  *
3805  * Note that if we are at EOF, c will wind up as '\0' because
3806  * of the guaranteed pad of raw_buf.
3807  */
3809 
3810  /* get next char */
3811  c = copy_raw_buf[raw_buf_ptr];
3812 
3813  if (c == '\n')
3814  {
3815  raw_buf_ptr++; /* eat newline */
3816  cstate->eol_type = EOL_CRNL; /* in case not set yet */
3817  }
3818  else
3819  {
3820  /* found \r, but no \n */
3821  if (cstate->eol_type == EOL_CRNL)
3822  ereport(ERROR,
3823  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3824  !cstate->csv_mode ?
3825  errmsg("literal carriage return found in data") :
3826  errmsg("unquoted carriage return found in data"),
3827  !cstate->csv_mode ?
3828  errhint("Use \"\\r\" to represent carriage return.") :
3829  errhint("Use quoted CSV field to represent carriage return.")));
3830 
3831  /*
3832  * if we got here, it is the first line and we didn't find
3833  * \n, so don't consume the peeked character
3834  */
3835  cstate->eol_type = EOL_CR;
3836  }
3837  }
3838  else if (cstate->eol_type == EOL_NL)
3839  ereport(ERROR,
3840  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3841  !cstate->csv_mode ?
3842  errmsg("literal carriage return found in data") :
3843  errmsg("unquoted carriage return found in data"),
3844  !cstate->csv_mode ?
3845  errhint("Use \"\\r\" to represent carriage return.") :
3846  errhint("Use quoted CSV field to represent carriage return.")));
3847  /* If reach here, we have found the line terminator */
3848  break;
3849  }
3850 
3851  /* Process \n */
3852  if (c == '\n' && (!cstate->csv_mode || !in_quote))
3853  {
3854  if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3855  ereport(ERROR,
3856  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3857  !cstate->csv_mode ?
3858  errmsg("literal newline found in data") :
3859  errmsg("unquoted newline found in data"),
3860  !cstate->csv_mode ?
3861  errhint("Use \"\\n\" to represent newline.") :
3862  errhint("Use quoted CSV field to represent newline.")));
3863  cstate->eol_type = EOL_NL; /* in case not set yet */
3864  /* If reach here, we have found the line terminator */
3865  break;
3866  }
3867 
3868  /*
3869  * In CSV mode, we only recognize \. alone on a line. This is because
3870  * \. is a valid CSV data value.
3871  */
3872  if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3873  {
3874  char c2;
3875 
3878 
3879  /* -----
3880  * get next character
3881  * Note: we do not change c so if it isn't \., we can fall
3882  * through and continue processing for file encoding.
3883  * -----
3884  */
3885  c2 = copy_raw_buf[raw_buf_ptr];
3886 
3887  if (c2 == '.')
3888  {
3889  raw_buf_ptr++; /* consume the '.' */
3890 
3891  /*
3892  * Note: if we loop back for more data here, it does not
3893  * matter that the CSV state change checks are re-executed; we
3894  * will come back here with no important state changed.
3895  */
3896  if (cstate->eol_type == EOL_CRNL)
3897  {
3898  /* Get the next character */
3900  /* if hit_eof, c2 will become '\0' */
3901  c2 = copy_raw_buf[raw_buf_ptr++];
3902 
3903  if (c2 == '\n')
3904  {
3905  if (!cstate->csv_mode)
3906  ereport(ERROR,
3907  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3908  errmsg("end-of-copy marker does not match previous newline style")));
3909  else
3911  }
3912  else if (c2 != '\r')
3913  {
3914  if (!cstate->csv_mode)
3915  ereport(ERROR,
3916  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3917  errmsg("end-of-copy marker corrupt")));
3918  else
3920  }
3921  }
3922 
3923  /* Get the next character */
3925  /* if hit_eof, c2 will become '\0' */
3926  c2 = copy_raw_buf[raw_buf_ptr++];
3927 
3928  if (c2 != '\r' && c2 != '\n')
3929  {
3930  if (!cstate->csv_mode)
3931  ereport(ERROR,
3932  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3933  errmsg("end-of-copy marker corrupt")));
3934  else
3936  }
3937 
3938  if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3939  (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3940  (cstate->eol_type == EOL_CR && c2 != '\r'))
3941  {
3942  ereport(ERROR,
3943  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3944  errmsg("end-of-copy marker does not match previous newline style")));
3945  }
3946 
3947  /*
3948  * Transfer only the data before the \. into line_buf, then
3949  * discard the data and the \. sequence.
3950  */
3951  if (prev_raw_ptr > cstate->raw_buf_index)
3953  cstate->raw_buf + cstate->raw_buf_index,
3954  prev_raw_ptr - cstate->raw_buf_index);
3955  cstate->raw_buf_index = raw_buf_ptr;
3956  result = true; /* report EOF */
3957  break;
3958  }
3959  else if (!cstate->csv_mode)
3960 
3961  /*
3962  * If we are here, it means we found a backslash followed by
3963  * something other than a period. In non-CSV mode, anything
3964  * after a backslash is special, so we skip over that second
3965  * character too. If we didn't do that \\. would be
3966  * considered an eof-of copy, while in non-CSV mode it is a
3967  * literal backslash followed by a period. In CSV mode,
3968  * backslashes are not special, so we want to process the
3969  * character after the backslash just like a normal character,
3970  * so we don't increment in those cases.
3971  */
3972  raw_buf_ptr++;
3973  }
3974 
3975  /*
3976  * This label is for CSV cases where \. appears at the start of a
3977  * line, but there is more text after it, meaning it was a data value.
3978  * We are more strict for \. in CSV mode because \. could be a data
3979  * value, while in non-CSV mode, \. cannot be a data value.
3980  */
3981 not_end_of_copy:
3982 
3983  /*
3984  * Process all bytes of a multi-byte character as a group.
3985  *
3986  * We only support multi-byte sequences where the first byte has the
3987  * high-bit set, so as an optimization we can avoid this block
3988  * entirely if it is not set.
3989  */
3990  if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3991  {
3992  int mblen;
3993 
3994  mblen_str[0] = c;
3995  /* All our encodings only read the first byte to get the length */
3996  mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3998  IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3999  raw_buf_ptr += mblen - 1;
4000  }
4001  first_char_in_line = false;
4002  } /* end of outer loop */
4003 
4004  /*
4005  * Transfer any still-uncopied data to line_buf.
4006  */
4008 
4009  return result;
4010 }
4011 
4012 /*
4013  * Return decimal value for a hexadecimal digit
4014  */
4015 static int
4017 {
4018  if (isdigit((unsigned char) hex))
4019  return hex - '0';
4020  else
4021  return tolower((unsigned char) hex) - 'a' + 10;
4022 }
4023 
4024 /*
4025  * Parse the current line into separate attributes (fields),
4026  * performing de-escaping as needed.
4027  *
4028  * The input is in line_buf. We use attribute_buf to hold the result
4029  * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4030  * string, or NULL when the input matches the null marker string.
4031  * This array is expanded as necessary.
4032  *
4033  * (Note that the caller cannot check for nulls since the returned
4034  * string would be the post-de-escaping equivalent, which may look
4035  * the same as some valid data string.)
4036  *
4037  * delim is the column delimiter string (must be just one byte for now).
4038  * null_print is the null marker string. Note that this is compared to
4039  * the pre-de-escaped input string.
4040  *
4041  * The return value is the number of fields actually read.
4042  */
4043 static int
4045 {
4046  char delimc = cstate->delim[0];
4047  int fieldno;
4048  char *output_ptr;
4049  char *cur_ptr;
4050  char *line_end_ptr;
4051 
4052  /*
4053  * We need a special case for zero-column tables: check that the input
4054  * line is empty, and return.
4055  */
4056  if (cstate->max_fields <= 0)
4057  {
4058  if (cstate->line_buf.len != 0)
4059  ereport(ERROR,
4060  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4061  errmsg("extra data after last expected column")));
4062  return 0;
4063  }
4064 
4065  resetStringInfo(&cstate->attribute_buf);
4066 
4067  /*
4068  * The de-escaped attributes will certainly not be longer than the input
4069  * data line, so we can just force attribute_buf to be large enough and
4070  * then transfer data without any checks for enough space. We need to do
4071  * it this way because enlarging attribute_buf mid-stream would invalidate
4072  * pointers already stored into cstate->raw_fields[].
4073  */
4074  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4075  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4076  output_ptr = cstate->attribute_buf.data;
4077 
4078  /* set pointer variables for loop */
4079  cur_ptr = cstate->line_buf.data;
4080  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4081 
4082  /* Outer loop iterates over fields */
4083  fieldno = 0;
4084  for (;;)
4085  {
4086  bool found_delim = false;
4087  char *start_ptr;
4088  char *end_ptr;
4089  int input_len;
4090  bool saw_non_ascii = false;
4091 
4092  /* Make sure there is enough space for the next value */
4093  if (fieldno >= cstate->max_fields)
4094  {
4095  cstate->max_fields *= 2;
4096  cstate->raw_fields =
4097  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4098  }
4099 
4100  /* Remember start of field on both input and output sides */
4101  start_ptr = cur_ptr;
4102  cstate->raw_fields[fieldno] = output_ptr;
4103 
4104  /*
4105  * Scan data for field.
4106  *
4107  * Note that in this loop, we are scanning to locate the end of field
4108  * and also speculatively performing de-escaping. Once we find the
4109  * end-of-field, we can match the raw field contents against the null
4110  * marker string. Only after that comparison fails do we know that
4111  * de-escaping is actually the right thing to do; therefore we *must
4112  * not* throw any syntax errors before we've done the null-marker
4113  * check.
4114  */
4115  for (;;)
4116  {
4117  char c;
4118 
4119  end_ptr = cur_ptr;
4120  if (cur_ptr >= line_end_ptr)
4121  break;
4122  c = *cur_ptr++;
4123  if (c == delimc)
4124  {
4125  found_delim = true;
4126  break;
4127  }
4128  if (c == '\\')
4129  {
4130  if (cur_ptr >= line_end_ptr)
4131  break;
4132  c = *cur_ptr++;
4133  switch (c)
4134  {
4135  case '0':
4136  case '1':
4137  case '2':
4138  case '3':
4139  case '4':
4140  case '5':
4141  case '6':
4142  case '7':
4143  {
4144  /* handle \013 */
4145  int val;
4146 
4147  val = OCTVALUE(c);
4148  if (cur_ptr < line_end_ptr)
4149  {
4150  c = *cur_ptr;
4151  if (ISOCTAL(c))
4152  {
4153  cur_ptr++;
4154  val = (val << 3) + OCTVALUE(c);
4155  if (cur_ptr < line_end_ptr)
4156  {
4157  c = *cur_ptr;
4158  if (ISOCTAL(c))
4159  {
4160  cur_ptr++;
4161  val = (val << 3) + OCTVALUE(c);
4162  }
4163  }
4164  }
4165  }
4166  c = val & 0377;
4167  if (c == '\0' || IS_HIGHBIT_SET(c))
4168  saw_non_ascii = true;
4169  }
4170  break;
4171  case 'x':
4172  /* Handle \x3F */
4173  if (cur_ptr < line_end_ptr)
4174  {
4175  char hexchar = *cur_ptr;
4176 
4177  if (isxdigit((unsigned char) hexchar))
4178  {
4179  int val = GetDecimalFromHex(hexchar);
4180 
4181  cur_ptr++;
4182  if (cur_ptr < line_end_ptr)
4183  {
4184  hexchar = *cur_ptr;
4185  if (isxdigit((unsigned char) hexchar))
4186  {
4187  cur_ptr++;
4188  val = (val << 4) + GetDecimalFromHex(hexchar);
4189  }
4190  }
4191  c = val & 0xff;
4192  if (c == '\0' || IS_HIGHBIT_SET(c))
4193  saw_non_ascii = true;
4194  }
4195  }
4196  break;
4197  case 'b':
4198  c = '\b';
4199  break;
4200  case 'f':
4201  c = '\f';
4202  break;
4203  case 'n':
4204  c = '\n';
4205  break;
4206  case 'r':
4207  c = '\r';
4208  break;
4209  case 't':
4210  c = '\t';
4211  break;
4212  case 'v':
4213  c = '\v';
4214  break;
4215 
4216  /*
4217  * in all other cases, take the char after '\'
4218  * literally
4219  */
4220  }
4221  }
4222 
4223  /* Add c to output string */
4224  *output_ptr++ = c;
4225  }
4226 
4227  /* Check whether raw input matched null marker */
4228  input_len = end_ptr - start_ptr;
4229  if (input_len == cstate->null_print_len &&
4230  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4231  cstate->raw_fields[fieldno] = NULL;
4232  else
4233  {
4234  /*
4235  * At this point we know the field is supposed to contain data.
4236  *
4237  * If we de-escaped any non-7-bit-ASCII chars, make sure the
4238  * resulting string is valid data for the db encoding.
4239  */
4240  if (saw_non_ascii)
4241  {
4242  char *fld = cstate->raw_fields[fieldno];
4243 
4244  pg_verifymbstr(fld, output_ptr - fld, false);
4245  }
4246  }
4247 
4248  /* Terminate attribute value in output area */
4249  *output_ptr++ = '\0';
4250 
4251  fieldno++;
4252  /* Done if we hit EOL instead of a delim */
4253  if (!found_delim)
4254  break;
4255  }
4256 
4257  /* Clean up state of attribute_buf */
4258  output_ptr--;
4259  Assert(*output_ptr == '\0');
4260  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4261 
4262  return fieldno;
4263 }
4264 
4265 /*
4266  * Parse the current line into separate attributes (fields),
4267  * performing de-escaping as needed. This has exactly the same API as
4268  * CopyReadAttributesText, except we parse the fields according to
4269  * "standard" (i.e. common) CSV usage.
4270  */
4271 static int
4273 {
4274  char delimc = cstate->delim[0];
4275  char quotec = cstate->quote[0];
4276  char escapec = cstate->escape[0];
4277  int fieldno;
4278  char *output_ptr;
4279  char *cur_ptr;
4280  char *line_end_ptr;
4281 
4282  /*
4283  * We need a special case for zero-column tables: check that the input
4284  * line is empty, and return.
4285  */
4286  if (cstate->max_fields <= 0)
4287  {
4288  if (cstate->line_buf.len != 0)
4289  ereport(ERROR,
4290  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4291  errmsg("extra data after last expected column")));
4292  return 0;
4293  }
4294 
4295  resetStringInfo(&cstate->attribute_buf);
4296 
4297  /*
4298  * The de-escaped attributes will certainly not be longer than the input
4299  * data line, so we can just force attribute_buf to be large enough and
4300  * then transfer data without any checks for enough space. We need to do
4301  * it this way because enlarging attribute_buf mid-stream would invalidate
4302  * pointers already stored into cstate->raw_fields[].
4303  */
4304  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4305  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4306  output_ptr = cstate->attribute_buf.data;
4307 
4308  /* set pointer variables for loop */
4309  cur_ptr = cstate->line_buf.data;
4310  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4311 
4312  /* Outer loop iterates over fields */
4313  fieldno = 0;
4314  for (;;)
4315  {
4316  bool found_delim = false;
4317  bool saw_quote = false;
4318  char *start_ptr;
4319  char *end_ptr;
4320  int input_len;
4321 
4322  /* Make sure there is enough space for the next value */
4323  if (fieldno >= cstate->max_fields)
4324  {
4325  cstate->max_fields *= 2;
4326  cstate->raw_fields =
4327  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4328  }
4329 
4330  /* Remember start of field on both input and output sides */
4331  start_ptr = cur_ptr;
4332  cstate->raw_fields[fieldno] = output_ptr;
4333 
4334  /*
4335  * Scan data for field,
4336  *
4337  * The loop starts in "not quote" mode and then toggles between that
4338  * and "in quote" mode. The loop exits normally if it is in "not
4339  * quote" mode and a delimiter or line end is seen.
4340  */
4341  for (;;)
4342  {
4343  char c;
4344 
4345  /* Not in quote */
4346  for (;;)
4347  {
4348  end_ptr = cur_ptr;
4349  if (cur_ptr >= line_end_ptr)
4350  goto endfield;
4351  c = *cur_ptr++;
4352  /* unquoted field delimiter */
4353  if (c == delimc)
4354  {
4355  found_delim = true;
4356  goto endfield;
4357  }
4358  /* start of quoted field (or part of field) */
4359  if (c == quotec)
4360  {
4361  saw_quote = true;
4362  break;
4363  }
4364  /* Add c to output string */
4365  *output_ptr++ = c;
4366  }
4367 
4368  /* In quote */
4369  for (;;)
4370  {
4371  end_ptr = cur_ptr;
4372  if (cur_ptr >= line_end_ptr)
4373  ereport(ERROR,
4374  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4375  errmsg("unterminated CSV quoted field")));
4376 
4377  c = *cur_ptr++;
4378 
4379  /* escape within a quoted field */
4380  if (c == escapec)
4381  {
4382  /*
4383  * peek at the next char if available, and escape it if it
4384  * is an escape char or a quote char
4385  */
4386  if (cur_ptr < line_end_ptr)
4387  {
4388  char nextc = *cur_ptr;
4389 
4390  if (nextc == escapec || nextc == quotec)
4391  {
4392  *output_ptr++ = nextc;
4393  cur_ptr++;
4394  continue;
4395  }
4396  }
4397  }
4398 
4399  /*
4400  * end of quoted field. Must do this test after testing for
4401  * escape in case quote char and escape char are the same
4402  * (which is the common case).
4403  */
4404  if (c == quotec)
4405  break;
4406 
4407  /* Add c to output string */
4408  *output_ptr++ = c;
4409  }
4410  }
4411 endfield:
4412 
4413  /* Terminate attribute value in output area */
4414  *output_ptr++ = '\0';
4415 
4416  /* Check whether raw input matched null marker */
4417  input_len = end_ptr - start_ptr;
4418  if (!saw_quote && input_len == cstate->null_print_len &&
4419  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4420  cstate->raw_fields[fieldno] = NULL;
4421 
4422  fieldno++;
4423  /* Done if we hit EOL instead of a delim */
4424  if (!found_delim)
4425  break;
4426  }
4427 
4428  /* Clean up state of attribute_buf */
4429  output_ptr--;
4430  Assert(*output_ptr == '\0');
4431  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4432 
4433  return fieldno;
4434 }
4435 
4436 
4437 /*
4438  * Read a binary attribute
4439  */
4440 static Datum
4442  int column_no, FmgrInfo *flinfo,
4443  Oid typioparam, int32 typmod,
4444  bool *isnull)
4445 {
4446  int32 fld_size;
4447  Datum result;
4448 
4449  if (!CopyGetInt32(cstate, &fld_size))
4450  ereport(ERROR,
4451  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4452  errmsg("unexpected EOF in COPY data")));
4453  if (fld_size == -1)
4454  {
4455  *isnull = true;
4456  return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4457  }
4458  if (fld_size < 0)
4459  ereport(ERROR,
4460  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4461  errmsg("invalid field size")));
4462 
4463  /* reset attribute_buf to empty, and load raw data in it */
4464  resetStringInfo(&cstate->attribute_buf);
4465 
4466  enlargeStringInfo(&cstate->attribute_buf, fld_size);
4467  if (CopyGetData(cstate, cstate->attribute_buf.data,
4468  fld_size, fld_size) != fld_size)
4469  ereport(ERROR,
4470  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4471  errmsg("unexpected EOF in COPY data")));
4472 
4473  cstate->attribute_buf.len = fld_size;
4474  cstate->attribute_buf.data[fld_size] = '\0';
4475 
4476  /* Call the column type's binary input converter */
4477  result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4478  typioparam, typmod);
4479 
4480  /* Trouble if it didn't eat the whole buffer */
4481  if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4482  ereport(ERROR,
4483  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4484  errmsg("incorrect binary data format")));
4485 
4486  *isnull = false;
4487  return result;
4488 }
4489 
4490 /*
4491  * Send text representation of one attribute, with conversion and escaping
4492  */
4493 #define DUMPSOFAR() \
4494  do { \
4495  if (ptr > start) \
4496  CopySendData(cstate, start, ptr - start); \
4497  } while (0)
4498 
4499 static void
4500 CopyAttributeOutText(CopyState cstate, char *string)
4501 {
4502  char *ptr;
4503  char *start;
4504  char c;
4505  char delimc = cstate->delim[0];
4506 
4507  if (cstate->need_transcoding)
4508  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4509  else
4510  ptr = string;
4511 
4512  /*
4513  * We have to grovel through the string searching for control characters
4514  * and instances of the delimiter character. In most cases, though, these
4515  * are infrequent. To avoid overhead from calling CopySendData once per
4516  * character, we dump out all characters between escaped characters in a
4517  * single call. The loop invariant is that the data from "start" to "ptr"
4518  * can be sent literally, but hasn't yet been.
4519  *
4520  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4521  * in valid backend encodings, extra bytes of a multibyte character never
4522  * look like ASCII. This loop is sufficiently performance-critical that
4523  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4524  * of the normal safe-encoding path.
4525  */
4526  if (cstate->encoding_embeds_ascii)
4527  {
4528  start = ptr;
4529  while ((c = *ptr) != '\0')
4530  {
4531  if ((unsigned char) c < (unsigned char) 0x20)
4532  {
4533  /*
4534  * \r and \n must be escaped, the others are traditional. We
4535  * prefer to dump these using the C-like notation, rather than
4536  * a backslash and the literal character, because it makes the
4537  * dump file a bit more proof against Microsoftish data
4538  * mangling.
4539  */
4540  switch (c)
4541  {
4542  case '\b':
4543  c = 'b';
4544  break;
4545  case '\f':
4546  c = 'f';
4547  break;
4548  case '\n':
4549  c = 'n';
4550  break;
4551  case '\r':
4552  c = 'r';
4553  break;
4554  case '\t':
4555  c = 't';
4556  break;
4557  case '\v':
4558  c = 'v';
4559  break;
4560  default:
4561  /* If it's the delimiter, must backslash it */
4562  if (c == delimc)
4563  break;
4564  /* All ASCII control chars are length 1 */
4565  ptr++;
4566  continue; /* fall to end of loop */
4567  }
4568  /* if we get here, we need to convert the control char */
4569  DUMPSOFAR();
4570  CopySendChar(cstate, '\\');
4571  CopySendChar(cstate, c);
4572  start = ++ptr; /* do not include char in next run */
4573  }
4574  else if (c == '\\' || c == delimc)
4575  {
4576  DUMPSOFAR();
4577  CopySendChar(cstate, '\\');
4578  start = ptr++; /* we include char in next run */
4579  }
4580  else if (IS_HIGHBIT_SET(c))
4581  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4582  else
4583  ptr++;
4584  }
4585  }
4586  else
4587  {
4588  start = ptr;
4589  while ((c = *ptr) != '\0')
4590  {
4591  if ((unsigned char) c < (unsigned char) 0x20)
4592  {
4593  /*
4594  * \r and \n must be escaped, the others are traditional. We
4595  * prefer to dump these using the C-like notation, rather than
4596  * a backslash and the literal character, because it makes the
4597  * dump file a bit more proof against Microsoftish data
4598  * mangling.
4599  */
4600  switch (c)
4601  {
4602  case '\b':
4603  c = 'b';
4604  break;
4605  case '\f':
4606  c = 'f';
4607  break;
4608  case '\n':
4609  c = 'n';
4610  break;
4611  case '\r':
4612  c = 'r';
4613  break;
4614  case '\t':
4615  c = 't';
4616  break;
4617  case '\v':
4618  c = 'v';
4619  break;
4620  default:
4621  /* If it's the delimiter, must backslash it */
4622  if (c == delimc)
4623  break;
4624  /* All ASCII control chars are length 1 */
4625  ptr++;
4626  continue; /* fall to end of loop */
4627  }
4628  /* if we get here, we need to convert the control char */
4629  DUMPSOFAR();
4630  CopySendChar(cstate, '\\');
4631  CopySendChar(cstate, c);
4632  start = ++ptr; /* do not include char in next run */
4633  }
4634  else if (c == '\\' || c == delimc)
4635  {
4636  DUMPSOFAR();
4637  CopySendChar(cstate, '\\');
4638  start = ptr++; /* we include char in next run */
4639  }
4640  else
4641  ptr++;
4642  }
4643  }
4644 
4645  DUMPSOFAR();
4646 }
4647 
4648 /*
4649  * Send text representation of one attribute, with conversion and
4650  * CSV-style escaping
4651  */
4652 static void
4653 CopyAttributeOutCSV(CopyState cstate, char *string,
4654  bool use_quote, bool single_attr)
4655 {
4656  char *ptr;
4657  char *start;
4658  char c;
4659  char delimc = cstate->delim[0];
4660  char quotec = cstate->quote[0];
4661  char escapec = cstate->escape[0];
4662 
4663  /* force quoting if it matches null_print (before conversion!) */
4664  if (!use_quote && strcmp(string, cstate->null_print) == 0)
4665  use_quote = true;
4666 
4667  if (cstate->need_transcoding)
4668  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4669  else
4670  ptr = string;
4671 
4672  /*
4673  * Make a preliminary pass to discover if it needs quoting
4674  */
4675  if (!use_quote)
4676  {
4677  /*
4678  * Because '\.' can be a data value, quote it if it appears alone on a
4679  * line so it is not interpreted as the end-of-data marker.
4680  */
4681  if (single_attr && strcmp(ptr, "\\.") == 0)
4682  use_quote = true;
4683  else
4684  {
4685  char *tptr = ptr;
4686 
4687  while ((c = *tptr) != '\0')
4688  {
4689  if (c == delimc || c == quotec || c == '\n' || c == '\r')
4690  {
4691  use_quote = true;
4692  break;
4693  }
4694  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4695  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4696  else
4697  tptr++;
4698  }
4699  }
4700  }
4701 
4702  if (use_quote)
4703  {
4704  CopySendChar(cstate, quotec);
4705 
4706  /*
4707  * We adopt the same optimization strategy as in CopyAttributeOutText
4708  */
4709  start = ptr;
4710  while ((c = *ptr) != '\0')
4711  {
4712  if (c == quotec || c == escapec)
4713  {
4714  DUMPSOFAR();
4715  CopySendChar(cstate, escapec);
4716  start = ptr; /* we include char in next run */
4717  }
4718  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4719  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4720  else
4721  ptr++;
4722  }
4723  DUMPSOFAR();
4724 
4725  CopySendChar(cstate, quotec);
4726  }
4727  else
4728  {
4729  /* If it doesn't need quoting, we can just dump it as-is */
4730  CopySendString(cstate, ptr);
4731  }
4732 }
4733 
4734 /*
4735  * CopyGetAttnums - build an integer list of attnums to be copied
4736  *
4737  * The input attnamelist is either the user-specified column list,
4738  * or NIL if there was none (in which case we want all the non-dropped
4739  * columns).
4740  *
4741  * rel can be NULL ... it's only used for error reports.
4742  */
4743 static List *
4744 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4745 {
4746  List *attnums = NIL;
4747 
4748  if (attnamelist == NIL)
4749  {
4750  /* Generate default column list */
4751  int attr_count = tupDesc->natts;
4752  int i;
4753 
4754  for (i = 0; i < attr_count; i++)
4755  {
4756  if (TupleDescAttr(tupDesc, i)->attisdropped)
4757  continue;
4758  attnums = lappend_int(attnums, i + 1);
4759  }
4760  }
4761  else
4762  {
4763  /* Validate the user-supplied list and extract attnums */
4764  ListCell *l;
4765 
4766  foreach(l, attnamelist)
4767  {
4768  char *name = strVal(lfirst(l));
4769  int attnum;
4770  int i;
4771 
4772  /* Lookup column name */
4773  attnum = InvalidAttrNumber;
4774  for (i = 0; i < tupDesc->natts; i++)
4775  {
4776  Form_pg_attribute att = TupleDescAttr(tupDesc, i);
4777 
4778  if (att->attisdropped)
4779  continue;
4780  if (namestrcmp(&(att->attname), name) == 0)
4781  {
4782  attnum = att->attnum;
4783  break;
4784  }
4785  }
4786  if (attnum == InvalidAttrNumber)
4787  {
4788  if (rel != NULL)
4789  ereport(ERROR,
4790  (errcode(ERRCODE_UNDEFINED_COLUMN),
4791  errmsg("column \"%s\" of relation \"%s\" does not exist",
4792  name, RelationGetRelationName(rel))));
4793  else
4794  ereport(ERROR,
4795  (errcode(ERRCODE_UNDEFINED_COLUMN),
4796  errmsg("column \"%s\" does not exist",
4797  name)));
4798  }
4799  /* Check for duplicates */
4800  if (list_member_int(attnums, attnum))
4801  ereport(ERROR,
4802  (errcode(ERRCODE_DUPLICATE_COLUMN),
4803  errmsg("column \"%s\" specified more than once",
4804  name)));
4805  attnums = lappend_int(attnums, attnum);
4806  }
4807  }
4808 
4809  return attnums;
4810 }
4811 
4812 
4813 /*
4814  * copy_dest_startup --- executor startup
4815  */
4816 static void
4817 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4818 {
4819  /* no-op */
4820 }
4821 
4822 /*
4823  * copy_dest_receive --- receive one tuple
4824  */
4825 static bool
4827 {
4828  DR_copy *myState = (DR_copy *) self;
4829  CopyState cstate = myState->cstate;
4830 
4831  /* Make sure the tuple is fully deconstructed */
4832  slot_getallattrs(slot);
4833 
4834  /* And send the data */
4835  CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4836  myState->processed++;
4837 
4838  return true;
4839 }
4840 
4841 /*
4842  * copy_dest_shutdown --- executor end
4843  */
4844 static void
4846 {
4847  /* no-op */
4848 }
4849 
4850 /*
4851  * copy_dest_destroy --- release DestReceiver object
4852  */
4853 static void
4855 {
4856  pfree(self);
4857 }
4858 
4859 /*
4860  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4861  */
4862 DestReceiver *
4864 {
4865  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4866 
4867  self->pub.receiveSlot = copy_dest_receive;
4868  self->pub.rStartup = copy_dest_startup;
4869  self->pub.rShutdown = copy_dest_shutdown;
4870  self->pub.rDestroy = copy_dest_destroy;
4871  self->pub.mydest = DestCopyOut;
4872 
4873  self->cstate = NULL; /* will be set later */
4874  self->processed = 0;
4875 
4876  return (DestReceiver *) self;
4877 }
signed short int16
Definition: c.h:293
List * indirection
Definition: parsenodes.h:440
bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
Definition: copy.c:3259
int ri_NumIndices
Definition: execnodes.h:358
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:469
int ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd, TupleTableSlot *slot, EState *estate)
static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
Definition: copy.c:4441
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static int GetDecimalFromHex(char hex)
Definition: copy.c:4016
Definition: fmgr.h:56
#define MAX_COPY_DATA_DISPLAY
bool csv_mode
Definition: copy.c:119
static void SendCopyEnd(CopyState cstate)
Definition: copy.c:419
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1306
Relation ri_RelationDesc
Definition: execnodes.h:355
Oid getOwnedSequence(Oid relid, AttrNumber attnum)
Definition: pg_depend.c:605
List * range_table
Definition: copy.c:167
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options)
Definition: copy.c:1375
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:781
#define IsA(nodeptr, _type_)
Definition: nodes.h:563
static bool CopyReadLineText(CopyState cstate)
Definition: copy.c:3657
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:123
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:1007
Node * val
Definition: parsenodes.h:441
static bool CopyGetInt32(CopyState cstate, int32 *val)
Definition: copy.c:685
int errhint(const char *fmt,...)
Definition: elog.c:987
int pg_char_to_encoding(const char *name)
Definition: encnames.c:551
char ** raw_fields
Definition: copy.c:191
Definition: copy.c:75
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2665
#define VARDATA(PTR)
Definition: postgres.h:303
static void EndCopy(CopyState cstate)
Definition: copy.c:1717
bool binary
Definition: copy.c:116
#define pq_flush()
Definition: libpq.h:39
void CopyFromErrorCallback(void *arg)
Definition: copy.c:2178
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1565
void PreventCommandIfParallelMode(const char *cmdname)
Definition: utility.c:254
List * ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:271
List * attlist
Definition: parsenodes.h:1956
List * fromClause
Definition: parsenodes.h:1521
#define ISOCTAL(c)
Definition: copy.c:54
#define OCTVALUE(c)
Definition: copy.c:55
#define ResetPerTupleExprContext(estate)
Definition: executor.h:477
#define RelationGetDescr(relation)
Definition: rel.h:437
#define HEAP_INSERT_FROZEN
Definition: heapam.h:30
char * name
Definition: parsenodes.h:439
bool need_transcoding
Definition: copy.c:106
#define castNode(_type_, nodeptr)
Definition: nodes.h:581
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
FmgrInfo * in_functions
Definition: copy.c:162
AttrNumber num_defaults
Definition: copy.c:158
List * attnumlist
Definition: copy.c:112
#define OIDOID
Definition: pg_type.h:328
#define VARSIZE(PTR)
Definition: postgres.h:304
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * filename
Definition: copy.c:113
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copy.c:2989
#define VARHDRSZ
Definition: c.h:503
bool file_has_oids
Definition: copy.c:159
#define DatumGetObjectId(X)
Definition: postgres.h:506
List * relationOids
Definition: plannodes.h:88
char * pstrdup(const char *in)
Definition: mcxt.c:1063
#define pg_hton16(x)
Definition: pg_bswap.h:120
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:204
static void ReceiveCopyBegin(CopyState cstate)
Definition: copy.c:383
#define XLogIsNeeded()
Definition: xlog.h:146
#define pg_ntoh16(x)
Definition: pg_bswap.h:124
Definition: copy.c:218
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define MAX_BUFFERED_TUPLES
bool rd_islocaltemp
Definition: rel.h:90
TupleTableSlot * ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2369
#define S_IWOTH
Definition: win32_port.h:298
Expr * expression_planner(Expr *expr)
Definition: planner.c:6026
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:140
DestReceiver pub
Definition: copy.c:220
void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options)
Definition: copy.c:1017
#define RELKIND_MATVIEW
Definition: pg_class.h:165
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1941
StringInfoData line_buf
Definition: copy.c:200
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen)
Definition: copy.c:254
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:839
#define AccessShareLock
Definition: lockdefs.h:36
#define InvalidBuffer
Definition: buf.h:25
#define gettext_noop(x)
Definition: c.h:991
struct CopyStateData CopyStateData
Definition: nodes.h:512
#define strVal(v)
Definition: value.h:54
struct cursor * cur
Definition: ecpg.c:28
int raw_buf_index
Definition: copy.c:213
static void CopyAttributeOutText(CopyState cstate, char *string)
Definition: copy.c:4500
bool line_buf_valid
Definition: copy.c:202
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1655
CopyState cstate
Definition: copy.c:221
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_BINARY_W
Definition: c.h:1038
bool superuser(void)
Definition: superuser.c:47
int namestrcmp(Name name, const char *str)
Definition: name.c:247
#define MemSet(start, val, len)
Definition: c.h:863
bool fe_eof
Definition: copy.c:103
uint64 CopyFrom(CopyState cstate)
Definition: copy.c:2283
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:111
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, HeapTuple trigtuple, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2354
void heap_sync(Relation rel)
Definition: heapam.c:9137
Datum * tts_values
Definition: tuptable.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
static void ClosePipeToProgram(CopyState cstate)
Definition: copy.c:1694
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:134
static int CopyReadAttributesCSV(CopyState cstate)
Definition: copy.c:4272
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
AclMode requiredPerms
Definition: parsenodes.h:1059
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
bool * force_quote_flags
Definition: copy.c:129
List * es_range_table
Definition: execnodes.h:432
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
List * pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)
Definition: postgres.c:649
char * delim
Definition: copy.c:124
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
Node * utilityStmt
Definition: parsenodes.h:118
bool is_program
Definition: parsenodes.h:1959
#define linitial_node(type, l)
Definition: pg_list.h:114
bool volatile_defexprs
Definition: copy.c:166
Datum oidout(PG_FUNCTION_ARGS)
Definition: oid.c:127
void(* callback)(void *arg)
Definition: elog.h:239
struct ErrorContextCallback * previous
Definition: elog.h:238
#define PG_BINARY_R
Definition: c.h:1037
static void copy_dest_destroy(DestReceiver *self)
Definition: copy.c:4854
bool * force_null_flags
Definition: copy.c:133
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
MemoryContext rowcontext
Definition: copy.c:153
int natts
Definition: tupdesc.h:79
bool line_buf_converted
Definition: copy.c:201
HeapTuple tcs_original_insert_tuple
Definition: trigger.h:82
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:634
signed int int32
Definition: c.h:294
int ClosePipeStream(FILE *file)
Definition: fd.c:2744
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
bool * convert_select_flags
Definition: copy.c:136
static void CopySendChar(CopyState cstate, char c)
Definition: copy.c:460
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1662
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
int location
Definition: parsenodes.h:235
CopyDest copy_dest
Definition: copy.c:99
int location
Definition: parsenodes.h:442
#define REFILL_LINEBUF
Definition: copy.c:271
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:703
ErrorContextCallback * error_context_stack
Definition: elog.c:88
#define list_make1(x1)
Definition: pg_list.h:139
char * null_print
Definition: copy.c:121
const char * cur_attname
Definition: copy.c:141
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:459
Definition: copy.c:63
bool trig_insert_instead_row
Definition: reltrigger.h:57
void FreeExecutorState(EState *estate)
Definition: execUtils.c:186
Relation rel
Definition: copy.c:110
#define GetPerTupleExprContext(estate)
Definition: executor.h:468
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2336
MemoryContext copycontext
Definition: copy.c:147
#define pq_startcopyout()
Definition: libpq.h:46
copy_data_source_cb data_source_cb
Definition: copy.c:115
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:127
bool trig_insert_new_table
Definition: reltrigger.h:74
Bitmapset * selectedCols
Definition: parsenodes.h:1061
unsigned short uint16
Definition: c.h:305
void pfree(void *pointer)
Definition: mcxt.c:936
#define pg_ntoh32(x)
Definition: pg_bswap.h:125
#define IS_HIGHBIT_SET(ch)
Definition: c.h:949
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options)
Definition: copy.c:1740
static void CopySendInt16(CopyState cstate, int16 val)
Definition: copy.c:702
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1141
TupleTableSlot * partition_tuple_slot
Definition: copy.c:174
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
void pq_startmsgread(void)
Definition: pqcomm.c:1210
#define DatumGetCString(X)
Definition: postgres.h:572
static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
Definition: copy.c:559
#define lfirst_int(lc)
Definition: pg_list.h:107
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:297
#define pg_hton32(x)
Definition: pg_bswap.h:121
static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr)
Definition: copy.c:4653
Datum ReceiveFunctionCall(FmgrInfo *flinfo, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1676
static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, int firstBufferedLineNo)
Definition: copy.c:2905
TupleConversionMap * tcs_map
Definition: trigger.h:73
#define FATAL
Definition: elog.h:52
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:122
char * defGetString(DefElem *def)
Definition: define.c:49
RangeVar * relation
Definition: parsenodes.h:1953
static struct @121 value
ItemPointerData t_self
Definition: htup.h:65
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:769
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:67
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:820
TriggerDesc * trigdesc
Definition: rel.h:120
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
static char * limit_printout_length(const char *str)
Definition: copy.c:2254
#define lfirst_node(type, lc)
Definition: pg_list.h:109
QueryDesc * queryDesc
Definition: copy.c:111
EolType
Definition: copy.c:72
bool list_member_int(const List *list, int datum)
Definition: list.c:485
static void copy_dest_shutdown(DestReceiver *self)
Definition: copy.c:4845
int num_dispatch
Definition: copy.c:170
bool encoding_embeds_ascii
Definition: copy.c:107
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3066
char * c
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2220
char * raw_buf
Definition: copy.c:212
static bool CopyLoadRawBuf(CopyState cstate)
Definition: copy.c:739
Node * stmt
Definition: parsenodes.h:1431
#define NoLock
Definition: lockdefs.h:34
int pq_getbytes(char *s, size_t len)
Definition: pqcomm.c:1094
char * quote
Definition: copy.c:125
static char * buf
Definition: pg_test_fsync.c:67
#define memmove(d, s, c)
Definition: c.h:1055
bool * tts_isnull
Definition: tuptable.h:126
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:277
int pg_database_encoding_max_length(void)
Definition: wchar.c:1833
List * targetList
Definition: parsenodes.h:1520
ResultRelInfo * es_result_relations
Definition: execnodes.h:442
static uint64 DoCopyTo(CopyState cstate)
Definition: copy.c:1878
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: copy.c:4817
int location
Definition: parsenodes.h:722
#define RowExclusiveLock
Definition: lockdefs.h:38
ExprState ** defexprs
Definition: copy.c:165
const char * cur_relname
Definition: copy.c:139
int errcode_for_file_access(void)
Definition: elog.c:598
int pg_encoding_mblen(int encoding, const char *mbstr)
Definition: wchar.c:1785
#define is_absolute_path(filename)
Definition: port.h:86
#define CStringGetDatum(X)
Definition: postgres.h:584
char string[11]
Definition: preproc-type.c:46
Definition: copy.c:76
List * options
Definition: parsenodes.h:1961
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2342
FmgrInfo oid_in_function
Definition: copy.c:160
#define select(n, r, w, e, timeout)
Definition: win32_port.h:447
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:264
#define RelationGetRelationName(relation)
Definition: rel.h:445
static const char BinarySignature[11]
Definition: copy.c:291
#define S_IWGRP
Definition: win32_port.h:286
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:379
unsigned int uint32
Definition: c.h:306
bytea * SendFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1723
int raw_buf_len
Definition: copy.c:214
Oid t_tableOid
Definition: htup.h:66
#define RELKIND_FOREIGN_TABLE
Definition: pg_class.h:167
bool trig_insert_after_row
Definition: reltrigger.h:56
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
int max_fields
Definition: copy.c:190
char * escape
Definition: copy.c:126
const char * p_sourcetext
Definition: parse_node.h:172
List * returningList
Definition: parsenodes.h:144
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:461
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2441
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2698
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2632
#define ereport(elevel, rest)
Definition: elog.h:122
Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2413
int null_print_len
Definition: copy.c:122
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1238
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:510
List * force_null
Definition: copy.c:132
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:367
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:399
EState * CreateExecutorState(void)
Definition: execUtils.c:81
List * lappend_int(List *list, int datum)
Definition: list.c:146
Node * arg
Definition: parsenodes.h:720
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc, const char *msg)
Definition: tupconvert.c:210
TupleConversionMap ** transition_tupconv_maps
Definition: copy.c:176
Definition: copy.c:77
List * lappend(List *list, void *datum)
Definition: list.c:128
int file_encoding
Definition: copy.c:105
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:4744
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
TupleDesc tupDesc
Definition: execdesc.h:47
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
#define stat(a, b)
Definition: win32_port.h:266
#define InvalidSnapshot
Definition: snapshot.h:25
SubTransactionId rd_createSubid
Definition: rel.h:110
Oid * typioparams
Definition: copy.c:163
bool is_program
Definition: copy.c:114
#define RELKIND_PARTITIONED_TABLE
Definition: pg_class.h:168
Node * build_column_default(Relation rel, int attrno)
bool trig_insert_before_row
Definition: reltrigger.h:55
void getTypeBinaryOutputInfo(Oid type, Oid *typSend, bool *typIsVarlena)
Definition: lsyscache.c:2731
List * es_tupleTable
Definition: execnodes.h:474
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void * palloc0(Size size)
Definition: mcxt.c:864
bool header_line
Definition: copy.c:120
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2277
uintptr_t Datum
Definition: postgres.h:372
int GetDatabaseEncoding(void)
Definition: mbutils.c:1004
int pg_get_client_encoding(void)
Definition: mbutils.c:306
#define ACL_SELECT
Definition: parsenodes.h:73
TupleTableSlot * tupslot
Definition: execPartition.h:45
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4404
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1808
int stmt_len
Definition: parsenodes.h:1433
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen)
Definition: copy.c:242
int stmt_location
Definition: parsenodes.h:1432
int es_num_result_relations
Definition: execnodes.h:443
List * ri_PartitionCheck
Definition: execnodes.h:409
#define RAW_BUF_SIZE
Definition: copy.c:211
TupleDesc rd_att
Definition: rel.h:115
bool freeze
Definition: copy.c:118
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1618
int pq_getbyte(void)
Definition: pqcomm.c:1000
void pq_endmsgread(void)
Definition: pqcomm.c:1234
Relation heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: heapam.c:1318
#define InvalidOid
Definition: postgres_ext.h:36