PostgreSQL Source Code  git master
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/xact.h"
25 #include "access/xlog.h"
26 #include "catalog/pg_type.h"
27 #include "commands/copy.h"
28 #include "commands/defrem.h"
29 #include "commands/trigger.h"
30 #include "executor/execPartition.h"
31 #include "executor/executor.h"
32 #include "libpq/libpq.h"
33 #include "libpq/pqformat.h"
34 #include "mb/pg_wchar.h"
35 #include "miscadmin.h"
36 #include "optimizer/clauses.h"
37 #include "optimizer/planner.h"
38 #include "nodes/makefuncs.h"
39 #include "parser/parse_relation.h"
40 #include "port/pg_bswap.h"
41 #include "rewrite/rewriteHandler.h"
42 #include "storage/fd.h"
43 #include "tcop/tcopprot.h"
44 #include "utils/builtins.h"
45 #include "utils/lsyscache.h"
46 #include "utils/memutils.h"
47 #include "utils/portal.h"
48 #include "utils/rel.h"
49 #include "utils/rls.h"
50 #include "utils/snapmgr.h"
51 
52 
53 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
54 #define OCTVALUE(c) ((c) - '0')
55 
56 /*
57  * Represents the different source/dest cases we need to worry about at
58  * the bottom level
59  */
60 typedef enum CopyDest
61 {
62  COPY_FILE, /* to/from file (or a piped program) */
63  COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
64  COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
65  COPY_CALLBACK /* to/from callback function */
66 } CopyDest;
67 
68 /*
69  * Represents the end-of-line terminator type of the input
70  */
71 typedef enum EolType
72 {
77 } EolType;
78 
79 /*
80  * This struct contains all the state variables used throughout a COPY
81  * operation. For simplicity, we use the same struct for all variants of COPY,
82  * even though some fields are used in only some cases.
83  *
84  * Multi-byte encodings: all supported client-side encodings encode multi-byte
85  * characters by having the first byte's high bit set. Subsequent bytes of the
86  * character can have the high bit not set. When scanning data in such an
87  * encoding to look for a match to a single-byte (ie ASCII) character, we must
88  * use the full pg_encoding_mblen() machinery to skip over multibyte
89  * characters, else we might find a false match to a trailing byte. In
90  * supported server encodings, there is no possibility of a false match, and
91  * it's faster to make useless comparisons to trailing bytes than it is to
92  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
93  * when we have to do it the hard way.
94  */
95 typedef struct CopyStateData
96 {
97  /* low-level state data */
98  CopyDest copy_dest; /* type of copy source/destination */
99  FILE *copy_file; /* used if copy_dest == COPY_FILE */
100  StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
101  * dest == COPY_NEW_FE in COPY FROM */
102  bool fe_eof; /* true if detected end of copy data */
103  EolType eol_type; /* EOL type of input */
104  int file_encoding; /* file or remote side's character encoding */
105  bool need_transcoding; /* file encoding diff from server? */
106  bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
107 
108  /* parameters from the COPY command */
109  Relation rel; /* relation to copy to or from */
110  QueryDesc *queryDesc; /* executable query to copy from */
111  List *attnumlist; /* integer list of attnums to copy */
112  char *filename; /* filename, or NULL for STDIN/STDOUT */
113  bool is_program; /* is 'filename' a program to popen? */
114  copy_data_source_cb data_source_cb; /* function for reading data */
115  bool binary; /* binary format? */
116  bool oids; /* include OIDs? */
117  bool freeze; /* freeze rows on loading? */
118  bool csv_mode; /* Comma Separated Value format? */
119  bool header_line; /* CSV header line? */
120  char *null_print; /* NULL marker string (server encoding!) */
121  int null_print_len; /* length of same */
122  char *null_print_client; /* same converted to file encoding */
123  char *delim; /* column delimiter (must be 1 byte) */
124  char *quote; /* CSV quote char (must be 1 byte) */
125  char *escape; /* CSV escape char (must be 1 byte) */
126  List *force_quote; /* list of column names */
127  bool force_quote_all; /* FORCE_QUOTE *? */
128  bool *force_quote_flags; /* per-column CSV FQ flags */
129  List *force_notnull; /* list of column names */
130  bool *force_notnull_flags; /* per-column CSV FNN flags */
131  List *force_null; /* list of column names */
132  bool *force_null_flags; /* per-column CSV FN flags */
133  bool convert_selectively; /* do selective binary conversion? */
134  List *convert_select; /* list of column names (can be NIL) */
135  bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
136 
137  /* these are just for error messages, see CopyFromErrorCallback */
138  const char *cur_relname; /* table name for error messages */
139  int cur_lineno; /* line number for error messages */
140  const char *cur_attname; /* current att for error messages */
141  const char *cur_attval; /* current att value for error messages */
142 
143  /*
144  * Working state for COPY TO/FROM
145  */
146  MemoryContext copycontext; /* per-copy execution context */
147 
148  /*
149  * Working state for COPY TO
150  */
151  FmgrInfo *out_functions; /* lookup info for output functions */
152  MemoryContext rowcontext; /* per-row evaluation context */
153 
154  /*
155  * Working state for COPY FROM
156  */
161  FmgrInfo *in_functions; /* array of input functions for each attrs */
162  Oid *typioparams; /* array of element types for in_functions */
163  int *defmap; /* array of default att numbers */
164  ExprState **defexprs; /* array of default att expressions */
165  bool volatile_defexprs; /* is any of defexprs volatile? */
167 
168  /* Tuple-routing support info */
170 
172 
173  /*
174  * These variables are used to reduce overhead in textual COPY FROM.
175  *
176  * attribute_buf holds the separated, de-escaped text for each field of
177  * the current line. The CopyReadAttributes functions return arrays of
178  * pointers into this buffer. We avoid palloc/pfree overhead by re-using
179  * the buffer on each cycle.
180  */
182 
183  /* field raw data pointers found by COPY FROM */
184 
186  char **raw_fields;
187 
188  /*
189  * Similarly, line_buf holds the whole input line being processed. The
190  * input cycle is first to read the whole line into line_buf, convert it
191  * to server encoding there, and then extract the individual attribute
192  * fields into attribute_buf. line_buf is preserved unmodified so that we
193  * can display it in error messages if appropriate.
194  */
196  bool line_buf_converted; /* converted to server encoding? */
197  bool line_buf_valid; /* contains the row being processed? */
198 
199  /*
200  * Finally, raw_buf holds raw data read from the data source (file or
201  * client connection). CopyReadLine parses this data sufficiently to
202  * locate line boundaries, then transfers the data to line_buf and
203  * converts it. Note: we guarantee that there is a \0 at
204  * raw_buf[raw_buf_len].
205  */
206 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
207  char *raw_buf;
208  int raw_buf_index; /* next byte to process */
209  int raw_buf_len; /* total # of bytes stored */
210 } CopyStateData;
211 
212 /* DestReceiver for COPY (query) TO */
213 typedef struct
214 {
215  DestReceiver pub; /* publicly-known function pointers */
216  CopyState cstate; /* CopyStateData for the command */
217  uint64 processed; /* # of tuples processed */
218 } DR_copy;
219 
220 
221 /*
222  * These macros centralize code used to process line_buf and raw_buf buffers.
223  * They are macros because they often do continue/break control and to avoid
224  * function call overhead in tight COPY loops.
225  *
226  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
227  * prevent the continue/break processing from working. We end the "if (1)"
228  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
229  * any "else" in the calling code, and to avoid any compiler warnings about
230  * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
231  */
232 
233 /*
234  * This keeps the character read at the top of the loop in the buffer
235  * even if there is more than one read-ahead.
236  */
237 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
238 if (1) \
239 { \
240  if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
241  { \
242  raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
243  need_data = true; \
244  continue; \
245  } \
246 } else ((void) 0)
247 
248 /* This consumes the remainder of the buffer and breaks */
249 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
250 if (1) \
251 { \
252  if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
253  { \
254  if (extralen) \
255  raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
256  /* backslash just before EOF, treat as data char */ \
257  result = true; \
258  break; \
259  } \
260 } else ((void) 0)
261 
262 /*
263  * Transfer any approved data to line_buf; must do this to be sure
264  * there is some room in raw_buf.
265  */
266 #define REFILL_LINEBUF \
267 if (1) \
268 { \
269  if (raw_buf_ptr > cstate->raw_buf_index) \
270  { \
271  appendBinaryStringInfo(&cstate->line_buf, \
272  cstate->raw_buf + cstate->raw_buf_index, \
273  raw_buf_ptr - cstate->raw_buf_index); \
274  cstate->raw_buf_index = raw_buf_ptr; \
275  } \
276 } else ((void) 0)
277 
278 /* Undo any read-ahead and jump out of the block. */
279 #define NO_END_OF_COPY_GOTO \
280 if (1) \
281 { \
282  raw_buf_ptr = prev_raw_ptr + 1; \
283  goto not_end_of_copy; \
284 } else ((void) 0)
285 
286 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
287 
288 
289 /* non-export function prototypes */
290 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
291  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
292  List *options);
293 static void EndCopy(CopyState cstate);
294 static void ClosePipeToProgram(CopyState cstate);
295 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
296  Oid queryRelId, const char *filename, bool is_program,
297  List *attnamelist, List *options);
298 static void EndCopyTo(CopyState cstate);
299 static uint64 DoCopyTo(CopyState cstate);
300 static uint64 CopyTo(CopyState cstate);
301 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
302  Datum *values, bool *nulls);
303 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
304  CommandId mycid, int hi_options,
305  ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
306  BulkInsertState bistate,
307  int nBufferedTuples, HeapTuple *bufferedTuples,
308  int firstBufferedLineNo);
309 static bool CopyReadLine(CopyState cstate);
310 static bool CopyReadLineText(CopyState cstate);
311 static int CopyReadAttributesText(CopyState cstate);
312 static int CopyReadAttributesCSV(CopyState cstate);
314  int column_no, FmgrInfo *flinfo,
315  Oid typioparam, int32 typmod,
316  bool *isnull);
317 static void CopyAttributeOutText(CopyState cstate, char *string);
318 static void CopyAttributeOutCSV(CopyState cstate, char *string,
319  bool use_quote, bool single_attr);
320 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
321  List *attnamelist);
322 static char *limit_printout_length(const char *str);
323 
324 /* Low-level communications functions */
325 static void SendCopyBegin(CopyState cstate);
326 static void ReceiveCopyBegin(CopyState cstate);
327 static void SendCopyEnd(CopyState cstate);
328 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
329 static void CopySendString(CopyState cstate, const char *str);
330 static void CopySendChar(CopyState cstate, char c);
331 static void CopySendEndOfRow(CopyState cstate);
332 static int CopyGetData(CopyState cstate, void *databuf,
333  int minread, int maxread);
334 static void CopySendInt32(CopyState cstate, int32 val);
335 static bool CopyGetInt32(CopyState cstate, int32 *val);
336 static void CopySendInt16(CopyState cstate, int16 val);
337 static bool CopyGetInt16(CopyState cstate, int16 *val);
338 
339 
340 /*
341  * Send copy start/stop messages for frontend copies. These have changed
342  * in past protocol redesigns.
343  */
344 static void
346 {
348  {
349  /* new way */
351  int natts = list_length(cstate->attnumlist);
352  int16 format = (cstate->binary ? 1 : 0);
353  int i;
354 
355  pq_beginmessage(&buf, 'H');
356  pq_sendbyte(&buf, format); /* overall format */
357  pq_sendint16(&buf, natts);
358  for (i = 0; i < natts; i++)
359  pq_sendint16(&buf, format); /* per-column formats */
360  pq_endmessage(&buf);
361  cstate->copy_dest = COPY_NEW_FE;
362  }
363  else
364  {
365  /* old way */
366  if (cstate->binary)
367  ereport(ERROR,
368  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
369  errmsg("COPY BINARY is not supported to stdout or from stdin")));
370  pq_putemptymessage('H');
371  /* grottiness needed for old COPY OUT protocol */
372  pq_startcopyout();
373  cstate->copy_dest = COPY_OLD_FE;
374  }
375 }
376 
377 static void
379 {
381  {
382  /* new way */
384  int natts = list_length(cstate->attnumlist);
385  int16 format = (cstate->binary ? 1 : 0);
386  int i;
387 
388  pq_beginmessage(&buf, 'G');
389  pq_sendbyte(&buf, format); /* overall format */
390  pq_sendint16(&buf, natts);
391  for (i = 0; i < natts; i++)
392  pq_sendint16(&buf, format); /* per-column formats */
393  pq_endmessage(&buf);
394  cstate->copy_dest = COPY_NEW_FE;
395  cstate->fe_msgbuf = makeStringInfo();
396  }
397  else
398  {
399  /* old way */
400  if (cstate->binary)
401  ereport(ERROR,
402  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
403  errmsg("COPY BINARY is not supported to stdout or from stdin")));
404  pq_putemptymessage('G');
405  /* any error in old protocol will make us lose sync */
406  pq_startmsgread();
407  cstate->copy_dest = COPY_OLD_FE;
408  }
409  /* We *must* flush here to ensure FE knows it can send. */
410  pq_flush();
411 }
412 
413 static void
415 {
416  if (cstate->copy_dest == COPY_NEW_FE)
417  {
418  /* Shouldn't have any unsent data */
419  Assert(cstate->fe_msgbuf->len == 0);
420  /* Send Copy Done message */
421  pq_putemptymessage('c');
422  }
423  else
424  {
425  CopySendData(cstate, "\\.", 2);
426  /* Need to flush out the trailer (this also appends a newline) */
427  CopySendEndOfRow(cstate);
428  pq_endcopyout(false);
429  }
430 }
431 
432 /*----------
433  * CopySendData sends output data to the destination (file or frontend)
434  * CopySendString does the same for null-terminated strings
435  * CopySendChar does the same for single characters
436  * CopySendEndOfRow does the appropriate thing at end of each data row
437  * (data is not actually flushed except by CopySendEndOfRow)
438  *
439  * NB: no data conversion is applied by these functions
440  *----------
441  */
442 static void
443 CopySendData(CopyState cstate, const void *databuf, int datasize)
444 {
445  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
446 }
447 
448 static void
449 CopySendString(CopyState cstate, const char *str)
450 {
451  appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
452 }
453 
454 static void
455 CopySendChar(CopyState cstate, char c)
456 {
458 }
459 
460 static void
462 {
463  StringInfo fe_msgbuf = cstate->fe_msgbuf;
464 
465  switch (cstate->copy_dest)
466  {
467  case COPY_FILE:
468  if (!cstate->binary)
469  {
470  /* Default line termination depends on platform */
471 #ifndef WIN32
472  CopySendChar(cstate, '\n');
473 #else
474  CopySendString(cstate, "\r\n");
475 #endif
476  }
477 
478  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
479  cstate->copy_file) != 1 ||
480  ferror(cstate->copy_file))
481  {
482  if (cstate->is_program)
483  {
484  if (errno == EPIPE)
485  {
486  /*
487  * The pipe will be closed automatically on error at
488  * the end of transaction, but we might get a better
489  * error message from the subprocess' exit code than
490  * just "Broken Pipe"
491  */
492  ClosePipeToProgram(cstate);
493 
494  /*
495  * If ClosePipeToProgram() didn't throw an error, the
496  * program terminated normally, but closed the pipe
497  * first. Restore errno, and throw an error.
498  */
499  errno = EPIPE;
500  }
501  ereport(ERROR,
503  errmsg("could not write to COPY program: %m")));
504  }
505  else
506  ereport(ERROR,
508  errmsg("could not write to COPY file: %m")));
509  }
510  break;
511  case COPY_OLD_FE:
512  /* The FE/BE protocol uses \n as newline for all platforms */
513  if (!cstate->binary)
514  CopySendChar(cstate, '\n');
515 
516  if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
517  {
518  /* no hope of recovering connection sync, so FATAL */
519  ereport(FATAL,
520  (errcode(ERRCODE_CONNECTION_FAILURE),
521  errmsg("connection lost during COPY to stdout")));
522  }
523  break;
524  case COPY_NEW_FE:
525  /* The FE/BE protocol uses \n as newline for all platforms */
526  if (!cstate->binary)
527  CopySendChar(cstate, '\n');
528 
529  /* Dump the accumulated row as one CopyData message */
530  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
531  break;
532  case COPY_CALLBACK:
533  Assert(false); /* Not yet supported. */
534  break;
535  }
536 
537  resetStringInfo(fe_msgbuf);
538 }
539 
540 /*
541  * CopyGetData reads data from the source (file or frontend)
542  *
543  * We attempt to read at least minread, and at most maxread, bytes from
544  * the source. The actual number of bytes read is returned; if this is
545  * less than minread, EOF was detected.
546  *
547  * Note: when copying from the frontend, we expect a proper EOF mark per
548  * protocol; if the frontend simply drops the connection, we raise error.
549  * It seems unwise to allow the COPY IN to complete normally in that case.
550  *
551  * NB: no data conversion is applied here.
552  */
553 static int
554 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
555 {
556  int bytesread = 0;
557 
558  switch (cstate->copy_dest)
559  {
560  case COPY_FILE:
561  bytesread = fread(databuf, 1, maxread, cstate->copy_file);
562  if (ferror(cstate->copy_file))
563  ereport(ERROR,
565  errmsg("could not read from COPY file: %m")));
566  break;
567  case COPY_OLD_FE:
568 
569  /*
570  * We cannot read more than minread bytes (which in practice is 1)
571  * because old protocol doesn't have any clear way of separating
572  * the COPY stream from following data. This is slow, but not any
573  * slower than the code path was originally, and we don't care
574  * much anymore about the performance of old protocol.
575  */
576  if (pq_getbytes((char *) databuf, minread))
577  {
578  /* Only a \. terminator is legal EOF in old protocol */
579  ereport(ERROR,
580  (errcode(ERRCODE_CONNECTION_FAILURE),
581  errmsg("unexpected EOF on client connection with an open transaction")));
582  }
583  bytesread = minread;
584  break;
585  case COPY_NEW_FE:
586  while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
587  {
588  int avail;
589 
590  while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
591  {
592  /* Try to receive another message */
593  int mtype;
594 
595  readmessage:
597  pq_startmsgread();
598  mtype = pq_getbyte();
599  if (mtype == EOF)
600  ereport(ERROR,
601  (errcode(ERRCODE_CONNECTION_FAILURE),
602  errmsg("unexpected EOF on client connection with an open transaction")));
603  if (pq_getmessage(cstate->fe_msgbuf, 0))
604  ereport(ERROR,
605  (errcode(ERRCODE_CONNECTION_FAILURE),
606  errmsg("unexpected EOF on client connection with an open transaction")));
608  switch (mtype)
609  {
610  case 'd': /* CopyData */
611  break;
612  case 'c': /* CopyDone */
613  /* COPY IN correctly terminated by frontend */
614  cstate->fe_eof = true;
615  return bytesread;
616  case 'f': /* CopyFail */
617  ereport(ERROR,
618  (errcode(ERRCODE_QUERY_CANCELED),
619  errmsg("COPY from stdin failed: %s",
620  pq_getmsgstring(cstate->fe_msgbuf))));
621  break;
622  case 'H': /* Flush */
623  case 'S': /* Sync */
624 
625  /*
626  * Ignore Flush/Sync for the convenience of client
627  * libraries (such as libpq) that may send those
628  * without noticing that the command they just
629  * sent was COPY.
630  */
631  goto readmessage;
632  default:
633  ereport(ERROR,
634  (errcode(ERRCODE_PROTOCOL_VIOLATION),
635  errmsg("unexpected message type 0x%02X during COPY from stdin",
636  mtype)));
637  break;
638  }
639  }
640  avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
641  if (avail > maxread)
642  avail = maxread;
643  pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
644  databuf = (void *) ((char *) databuf + avail);
645  maxread -= avail;
646  bytesread += avail;
647  }
648  break;
649  case COPY_CALLBACK:
650  bytesread = cstate->data_source_cb(databuf, minread, maxread);
651  break;
652  }
653 
654  return bytesread;
655 }
656 
657 
658 /*
659  * These functions do apply some data conversion
660  */
661 
662 /*
663  * CopySendInt32 sends an int32 in network byte order
664  */
665 static void
667 {
668  uint32 buf;
669 
670  buf = pg_hton32((uint32) val);
671  CopySendData(cstate, &buf, sizeof(buf));
672 }
673 
674 /*
675  * CopyGetInt32 reads an int32 that appears in network byte order
676  *
677  * Returns true if OK, false if EOF
678  */
679 static bool
681 {
682  uint32 buf;
683 
684  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
685  {
686  *val = 0; /* suppress compiler warning */
687  return false;
688  }
689  *val = (int32) pg_ntoh32(buf);
690  return true;
691 }
692 
693 /*
694  * CopySendInt16 sends an int16 in network byte order
695  */
696 static void
698 {
699  uint16 buf;
700 
701  buf = pg_hton16((uint16) val);
702  CopySendData(cstate, &buf, sizeof(buf));
703 }
704 
705 /*
706  * CopyGetInt16 reads an int16 that appears in network byte order
707  */
708 static bool
710 {
711  uint16 buf;
712 
713  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
714  {
715  *val = 0; /* suppress compiler warning */
716  return false;
717  }
718  *val = (int16) pg_ntoh16(buf);
719  return true;
720 }
721 
722 
723 /*
724  * CopyLoadRawBuf loads some more data into raw_buf
725  *
726  * Returns true if able to obtain at least one more byte, else false.
727  *
728  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
729  * down to the start of the buffer and then we load more data after that.
730  * This case is used only when a frontend multibyte character crosses a
731  * bufferload boundary.
732  */
733 static bool
735 {
736  int nbytes;
737  int inbytes;
738 
739  if (cstate->raw_buf_index < cstate->raw_buf_len)
740  {
741  /* Copy down the unprocessed data */
742  nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
743  memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
744  nbytes);
745  }
746  else
747  nbytes = 0; /* no data need be saved */
748 
749  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
750  1, RAW_BUF_SIZE - nbytes);
751  nbytes += inbytes;
752  cstate->raw_buf[nbytes] = '\0';
753  cstate->raw_buf_index = 0;
754  cstate->raw_buf_len = nbytes;
755  return (inbytes > 0);
756 }
757 
758 
759 /*
760  * DoCopy executes the SQL COPY statement
761  *
762  * Either unload or reload contents of table <relation>, depending on <from>.
763  * (<from> = true means we are inserting into the table.) In the "TO" case
764  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
765  * or DELETE query.
766  *
767  * If <pipe> is false, transfer is between the table and the file named
768  * <filename>. Otherwise, transfer is between the table and our regular
769  * input/output stream. The latter could be either stdin/stdout or a
770  * socket, depending on whether we're running under Postmaster control.
771  *
772  * Do not allow a Postgres user without superuser privilege to read from
773  * or write to a file.
774  *
775  * Do not allow the copy if user doesn't have proper permission to access
776  * the table or the specifically requested columns.
777  */
778 void
779 DoCopy(ParseState *pstate, const CopyStmt *stmt,
780  int stmt_location, int stmt_len,
781  uint64 *processed)
782 {
783  CopyState cstate;
784  bool is_from = stmt->is_from;
785  bool pipe = (stmt->filename == NULL);
786  Relation rel;
787  Oid relid;
788  RawStmt *query = NULL;
789 
790  /* Disallow COPY to/from file or program except to superusers. */
791  if (!pipe && !superuser())
792  {
793  if (stmt->is_program)
794  ereport(ERROR,
795  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
796  errmsg("must be superuser to COPY to or from an external program"),
797  errhint("Anyone can COPY to stdout or from stdin. "
798  "psql's \\copy command also works for anyone.")));
799  else
800  ereport(ERROR,
801  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
802  errmsg("must be superuser to COPY to or from a file"),
803  errhint("Anyone can COPY to stdout or from stdin. "
804  "psql's \\copy command also works for anyone.")));
805  }
806 
807  if (stmt->relation)
808  {
809  TupleDesc tupDesc;
810  List *attnums;
811  ListCell *cur;
812  RangeTblEntry *rte;
813 
814  Assert(!stmt->query);
815 
816  /* Open and lock the relation, using the appropriate lock type. */
817  rel = heap_openrv(stmt->relation,
818  (is_from ? RowExclusiveLock : AccessShareLock));
819 
820  relid = RelationGetRelid(rel);
821 
822  rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
823  rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
824 
825  tupDesc = RelationGetDescr(rel);
826  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
827  foreach(cur, attnums)
828  {
829  int attno = lfirst_int(cur) -
831 
832  if (is_from)
833  rte->insertedCols = bms_add_member(rte->insertedCols, attno);
834  else
835  rte->selectedCols = bms_add_member(rte->selectedCols, attno);
836  }
837  ExecCheckRTPerms(pstate->p_rtable, true);
838 
839  /*
840  * Permission check for row security policies.
841  *
842  * check_enable_rls will ereport(ERROR) if the user has requested
843  * something invalid and will otherwise indicate if we should enable
844  * RLS (returns RLS_ENABLED) or not for this COPY statement.
845  *
846  * If the relation has a row security policy and we are to apply it
847  * then perform a "query" copy and allow the normal query processing
848  * to handle the policies.
849  *
850  * If RLS is not enabled for this, then just fall through to the
851  * normal non-filtering relation handling.
852  */
853  if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
854  {
856  ColumnRef *cr;
857  ResTarget *target;
858  RangeVar *from;
859  List *targetList = NIL;
860 
861  if (is_from)
862  ereport(ERROR,
863  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
864  errmsg("COPY FROM not supported with row-level security"),
865  errhint("Use INSERT statements instead.")));
866 
867  /*
868  * Build target list
869  *
870  * If no columns are specified in the attribute list of the COPY
871  * command, then the target list is 'all' columns. Therefore, '*'
872  * should be used as the target list for the resulting SELECT
873  * statement.
874  *
875  * In the case that columns are specified in the attribute list,
876  * create a ColumnRef and ResTarget for each column and add them
877  * to the target list for the resulting SELECT statement.
878  */
879  if (!stmt->attlist)
880  {
881  cr = makeNode(ColumnRef);
883  cr->location = -1;
884 
885  target = makeNode(ResTarget);
886  target->name = NULL;
887  target->indirection = NIL;
888  target->val = (Node *) cr;
889  target->location = -1;
890 
891  targetList = list_make1(target);
892  }
893  else
894  {
895  ListCell *lc;
896 
897  foreach(lc, stmt->attlist)
898  {
899  /*
900  * Build the ColumnRef for each column. The ColumnRef
901  * 'fields' property is a String 'Value' node (see
902  * nodes/value.h) that corresponds to the column name
903  * respectively.
904  */
905  cr = makeNode(ColumnRef);
906  cr->fields = list_make1(lfirst(lc));
907  cr->location = -1;
908 
909  /* Build the ResTarget and add the ColumnRef to it. */
910  target = makeNode(ResTarget);
911  target->name = NULL;
912  target->indirection = NIL;
913  target->val = (Node *) cr;
914  target->location = -1;
915 
916  /* Add each column to the SELECT statement's target list */
917  targetList = lappend(targetList, target);
918  }
919  }
920 
921  /*
922  * Build RangeVar for from clause, fully qualified based on the
923  * relation which we have opened and locked.
924  */
927  -1);
928 
929  /* Build query */
930  select = makeNode(SelectStmt);
931  select->targetList = targetList;
932  select->fromClause = list_make1(from);
933 
934  query = makeNode(RawStmt);
935  query->stmt = (Node *) select;
936  query->stmt_location = stmt_location;
937  query->stmt_len = stmt_len;
938 
939  /*
940  * Close the relation for now, but keep the lock on it to prevent
941  * changes between now and when we start the query-based COPY.
942  *
943  * We'll reopen it later as part of the query-based COPY.
944  */
945  heap_close(rel, NoLock);
946  rel = NULL;
947  }
948  }
949  else
950  {
951  Assert(stmt->query);
952 
953  query = makeNode(RawStmt);
954  query->stmt = stmt->query;
955  query->stmt_location = stmt_location;
956  query->stmt_len = stmt_len;
957 
958  relid = InvalidOid;
959  rel = NULL;
960  }
961 
962  if (is_from)
963  {
964  Assert(rel);
965 
966  /* check read-only transaction and parallel mode */
967  if (XactReadOnly && !rel->rd_islocaltemp)
968  PreventCommandIfReadOnly("COPY FROM");
969  PreventCommandIfParallelMode("COPY FROM");
970 
971  cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
972  NULL, stmt->attlist, stmt->options);
973  *processed = CopyFrom(cstate); /* copy from file to database */
974  EndCopyFrom(cstate);
975  }
976  else
977  {
978  cstate = BeginCopyTo(pstate, rel, query, relid,
979  stmt->filename, stmt->is_program,
980  stmt->attlist, stmt->options);
981  *processed = DoCopyTo(cstate); /* copy from database to file */
982  EndCopyTo(cstate);
983  }
984 
985  /*
986  * Close the relation. If reading, we can release the AccessShareLock we
987  * got; if writing, we should hold the lock until end of transaction to
988  * ensure that updates will be committed before lock is released.
989  */
990  if (rel != NULL)
991  heap_close(rel, (is_from ? NoLock : AccessShareLock));
992 }
993 
994 /*
995  * Process the statement option list for COPY.
996  *
997  * Scan the options list (a list of DefElem) and transpose the information
998  * into cstate, applying appropriate error checking.
999  *
1000  * cstate is assumed to be filled with zeroes initially.
1001  *
1002  * This is exported so that external users of the COPY API can sanity-check
1003  * a list of options. In that usage, cstate should be passed as NULL
1004  * (since external users don't know sizeof(CopyStateData)) and the collected
1005  * data is just leaked until CurrentMemoryContext is reset.
1006  *
1007  * Note that additional checking, such as whether column names listed in FORCE
1008  * QUOTE actually exist, has to be applied later. This just checks for
1009  * self-consistency of the options list.
1010  */
1011 void
1013  CopyState cstate,
1014  bool is_from,
1015  List *options)
1016 {
1017  bool format_specified = false;
1018  ListCell *option;
1019 
1020  /* Support external use for option sanity checking */
1021  if (cstate == NULL)
1022  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1023 
1024  cstate->file_encoding = -1;
1025 
1026  /* Extract options from the statement node tree */
1027  foreach(option, options)
1028  {
1029  DefElem *defel = lfirst_node(DefElem, option);
1030 
1031  if (strcmp(defel->defname, "format") == 0)
1032  {
1033  char *fmt = defGetString(defel);
1034 
1035  if (format_specified)
1036  ereport(ERROR,
1037  (errcode(ERRCODE_SYNTAX_ERROR),
1038  errmsg("conflicting or redundant options"),
1039  parser_errposition(pstate, defel->location)));
1040  format_specified = true;
1041  if (strcmp(fmt, "text") == 0)
1042  /* default format */ ;
1043  else if (strcmp(fmt, "csv") == 0)
1044  cstate->csv_mode = true;
1045  else if (strcmp(fmt, "binary") == 0)
1046  cstate->binary = true;
1047  else
1048  ereport(ERROR,
1049  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050  errmsg("COPY format \"%s\" not recognized", fmt),
1051  parser_errposition(pstate, defel->location)));
1052  }
1053  else if (strcmp(defel->defname, "oids") == 0)
1054  {
1055  if (cstate->oids)
1056  ereport(ERROR,
1057  (errcode(ERRCODE_SYNTAX_ERROR),
1058  errmsg("conflicting or redundant options"),
1059  parser_errposition(pstate, defel->location)));
1060  cstate->oids = defGetBoolean(defel);
1061  }
1062  else if (strcmp(defel->defname, "freeze") == 0)
1063  {
1064  if (cstate->freeze)
1065  ereport(ERROR,
1066  (errcode(ERRCODE_SYNTAX_ERROR),
1067  errmsg("conflicting or redundant options"),
1068  parser_errposition(pstate, defel->location)));
1069  cstate->freeze = defGetBoolean(defel);
1070  }
1071  else if (strcmp(defel->defname, "delimiter") == 0)
1072  {
1073  if (cstate->delim)
1074  ereport(ERROR,
1075  (errcode(ERRCODE_SYNTAX_ERROR),
1076  errmsg("conflicting or redundant options"),
1077  parser_errposition(pstate, defel->location)));
1078  cstate->delim = defGetString(defel);
1079  }
1080  else if (strcmp(defel->defname, "null") == 0)
1081  {
1082  if (cstate->null_print)
1083  ereport(ERROR,
1084  (errcode(ERRCODE_SYNTAX_ERROR),
1085  errmsg("conflicting or redundant options"),
1086  parser_errposition(pstate, defel->location)));
1087  cstate->null_print = defGetString(defel);
1088  }
1089  else if (strcmp(defel->defname, "header") == 0)
1090  {
1091  if (cstate->header_line)
1092  ereport(ERROR,
1093  (errcode(ERRCODE_SYNTAX_ERROR),
1094  errmsg("conflicting or redundant options"),
1095  parser_errposition(pstate, defel->location)));
1096  cstate->header_line = defGetBoolean(defel);
1097  }
1098  else if (strcmp(defel->defname, "quote") == 0)
1099  {
1100  if (cstate->quote)
1101  ereport(ERROR,
1102  (errcode(ERRCODE_SYNTAX_ERROR),
1103  errmsg("conflicting or redundant options"),
1104  parser_errposition(pstate, defel->location)));
1105  cstate->quote = defGetString(defel);
1106  }
1107  else if (strcmp(defel->defname, "escape") == 0)
1108  {
1109  if (cstate->escape)
1110  ereport(ERROR,
1111  (errcode(ERRCODE_SYNTAX_ERROR),
1112  errmsg("conflicting or redundant options"),
1113  parser_errposition(pstate, defel->location)));
1114  cstate->escape = defGetString(defel);
1115  }
1116  else if (strcmp(defel->defname, "force_quote") == 0)
1117  {
1118  if (cstate->force_quote || cstate->force_quote_all)
1119  ereport(ERROR,
1120  (errcode(ERRCODE_SYNTAX_ERROR),
1121  errmsg("conflicting or redundant options"),
1122  parser_errposition(pstate, defel->location)));
1123  if (defel->arg && IsA(defel->arg, A_Star))
1124  cstate->force_quote_all = true;
1125  else if (defel->arg && IsA(defel->arg, List))
1126  cstate->force_quote = castNode(List, defel->arg);
1127  else
1128  ereport(ERROR,
1129  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1130  errmsg("argument to option \"%s\" must be a list of column names",
1131  defel->defname),
1132  parser_errposition(pstate, defel->location)));
1133  }
1134  else if (strcmp(defel->defname, "force_not_null") == 0)
1135  {
1136  if (cstate->force_notnull)
1137  ereport(ERROR,
1138  (errcode(ERRCODE_SYNTAX_ERROR),
1139  errmsg("conflicting or redundant options"),
1140  parser_errposition(pstate, defel->location)));
1141  if (defel->arg && IsA(defel->arg, List))
1142  cstate->force_notnull = castNode(List, defel->arg);
1143  else
1144  ereport(ERROR,
1145  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146  errmsg("argument to option \"%s\" must be a list of column names",
1147  defel->defname),
1148  parser_errposition(pstate, defel->location)));
1149  }
1150  else if (strcmp(defel->defname, "force_null") == 0)
1151  {
1152  if (cstate->force_null)
1153  ereport(ERROR,
1154  (errcode(ERRCODE_SYNTAX_ERROR),
1155  errmsg("conflicting or redundant options")));
1156  if (defel->arg && IsA(defel->arg, List))
1157  cstate->force_null = castNode(List, defel->arg);
1158  else
1159  ereport(ERROR,
1160  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1161  errmsg("argument to option \"%s\" must be a list of column names",
1162  defel->defname),
1163  parser_errposition(pstate, defel->location)));
1164  }
1165  else if (strcmp(defel->defname, "convert_selectively") == 0)
1166  {
1167  /*
1168  * Undocumented, not-accessible-from-SQL option: convert only the
1169  * named columns to binary form, storing the rest as NULLs. It's
1170  * allowed for the column list to be NIL.
1171  */
1172  if (cstate->convert_selectively)
1173  ereport(ERROR,
1174  (errcode(ERRCODE_SYNTAX_ERROR),
1175  errmsg("conflicting or redundant options"),
1176  parser_errposition(pstate, defel->location)));
1177  cstate->convert_selectively = true;
1178  if (defel->arg == NULL || IsA(defel->arg, List))
1179  cstate->convert_select = castNode(List, defel->arg);
1180  else
1181  ereport(ERROR,
1182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1183  errmsg("argument to option \"%s\" must be a list of column names",
1184  defel->defname),
1185  parser_errposition(pstate, defel->location)));
1186  }
1187  else if (strcmp(defel->defname, "encoding") == 0)
1188  {
1189  if (cstate->file_encoding >= 0)
1190  ereport(ERROR,
1191  (errcode(ERRCODE_SYNTAX_ERROR),
1192  errmsg("conflicting or redundant options"),
1193  parser_errposition(pstate, defel->location)));
1195  if (cstate->file_encoding < 0)
1196  ereport(ERROR,
1197  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1198  errmsg("argument to option \"%s\" must be a valid encoding name",
1199  defel->defname),
1200  parser_errposition(pstate, defel->location)));
1201  }
1202  else
1203  ereport(ERROR,
1204  (errcode(ERRCODE_SYNTAX_ERROR),
1205  errmsg("option \"%s\" not recognized",
1206  defel->defname),
1207  parser_errposition(pstate, defel->location)));
1208  }
1209 
1210  /*
1211  * Check for incompatible options (must do these two before inserting
1212  * defaults)
1213  */
1214  if (cstate->binary && cstate->delim)
1215  ereport(ERROR,
1216  (errcode(ERRCODE_SYNTAX_ERROR),
1217  errmsg("cannot specify DELIMITER in BINARY mode")));
1218 
1219  if (cstate->binary && cstate->null_print)
1220  ereport(ERROR,
1221  (errcode(ERRCODE_SYNTAX_ERROR),
1222  errmsg("cannot specify NULL in BINARY mode")));
1223 
1224  /* Set defaults for omitted options */
1225  if (!cstate->delim)
1226  cstate->delim = cstate->csv_mode ? "," : "\t";
1227 
1228  if (!cstate->null_print)
1229  cstate->null_print = cstate->csv_mode ? "" : "\\N";
1230  cstate->null_print_len = strlen(cstate->null_print);
1231 
1232  if (cstate->csv_mode)
1233  {
1234  if (!cstate->quote)
1235  cstate->quote = "\"";
1236  if (!cstate->escape)
1237  cstate->escape = cstate->quote;
1238  }
1239 
1240  /* Only single-byte delimiter strings are supported. */
1241  if (strlen(cstate->delim) != 1)
1242  ereport(ERROR,
1243  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1244  errmsg("COPY delimiter must be a single one-byte character")));
1245 
1246  /* Disallow end-of-line characters */
1247  if (strchr(cstate->delim, '\r') != NULL ||
1248  strchr(cstate->delim, '\n') != NULL)
1249  ereport(ERROR,
1250  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1251  errmsg("COPY delimiter cannot be newline or carriage return")));
1252 
1253  if (strchr(cstate->null_print, '\r') != NULL ||
1254  strchr(cstate->null_print, '\n') != NULL)
1255  ereport(ERROR,
1256  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1257  errmsg("COPY null representation cannot use newline or carriage return")));
1258 
1259  /*
1260  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1261  * backslash because it would be ambiguous. We can't allow the other
1262  * cases because data characters matching the delimiter must be
1263  * backslashed, and certain backslash combinations are interpreted
1264  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1265  * more than strictly necessary, but seems best for consistency and
1266  * future-proofing. Likewise we disallow all digits though only octal
1267  * digits are actually dangerous.
1268  */
1269  if (!cstate->csv_mode &&
1270  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1271  cstate->delim[0]) != NULL)
1272  ereport(ERROR,
1273  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1274  errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1275 
1276  /* Check header */
1277  if (!cstate->csv_mode && cstate->header_line)
1278  ereport(ERROR,
1279  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1280  errmsg("COPY HEADER available only in CSV mode")));
1281 
1282  /* Check quote */
1283  if (!cstate->csv_mode && cstate->quote != NULL)
1284  ereport(ERROR,
1285  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1286  errmsg("COPY quote available only in CSV mode")));
1287 
1288  if (cstate->csv_mode && strlen(cstate->quote) != 1)
1289  ereport(ERROR,
1290  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1291  errmsg("COPY quote must be a single one-byte character")));
1292 
1293  if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1294  ereport(ERROR,
1295  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1296  errmsg("COPY delimiter and quote must be different")));
1297 
1298  /* Check escape */
1299  if (!cstate->csv_mode && cstate->escape != NULL)
1300  ereport(ERROR,
1301  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1302  errmsg("COPY escape available only in CSV mode")));
1303 
1304  if (cstate->csv_mode && strlen(cstate->escape) != 1)
1305  ereport(ERROR,
1306  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1307  errmsg("COPY escape must be a single one-byte character")));
1308 
1309  /* Check force_quote */
1310  if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1311  ereport(ERROR,
1312  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1313  errmsg("COPY force quote available only in CSV mode")));
1314  if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1315  ereport(ERROR,
1316  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317  errmsg("COPY force quote only available using COPY TO")));
1318 
1319  /* Check force_notnull */
1320  if (!cstate->csv_mode && cstate->force_notnull != NIL)
1321  ereport(ERROR,
1322  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1323  errmsg("COPY force not null available only in CSV mode")));
1324  if (cstate->force_notnull != NIL && !is_from)
1325  ereport(ERROR,
1326  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327  errmsg("COPY force not null only available using COPY FROM")));
1328 
1329  /* Check force_null */
1330  if (!cstate->csv_mode && cstate->force_null != NIL)
1331  ereport(ERROR,
1332  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1333  errmsg("COPY force null available only in CSV mode")));
1334 
1335  if (cstate->force_null != NIL && !is_from)
1336  ereport(ERROR,
1337  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338  errmsg("COPY force null only available using COPY FROM")));
1339 
1340  /* Don't allow the delimiter to appear in the null string. */
1341  if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1342  ereport(ERROR,
1343  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1344  errmsg("COPY delimiter must not appear in the NULL specification")));
1345 
1346  /* Don't allow the CSV quote char to appear in the null string. */
1347  if (cstate->csv_mode &&
1348  strchr(cstate->null_print, cstate->quote[0]) != NULL)
1349  ereport(ERROR,
1350  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1351  errmsg("CSV quote character must not appear in the NULL specification")));
1352 }
1353 
1354 /*
1355  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1356  *
1357  * Iff <binary>, unload or reload in the binary format, as opposed to the
1358  * more wasteful but more robust and portable text format.
1359  *
1360  * Iff <oids>, unload or reload the format that includes OID information.
1361  * On input, we accept OIDs whether or not the table has an OID column,
1362  * but silently drop them if it does not. On output, we report an error
1363  * if the user asks for OIDs in a table that has none (not providing an
1364  * OID column might seem friendlier, but could seriously confuse programs).
1365  *
1366  * If in the text format, delimit columns with delimiter <delim> and print
1367  * NULL values as <null_print>.
1368  */
1369 static CopyState
1371  bool is_from,
1372  Relation rel,
1373  RawStmt *raw_query,
1374  Oid queryRelId,
1375  List *attnamelist,
1376  List *options)
1377 {
1378  CopyState cstate;
1379  TupleDesc tupDesc;
1380  int num_phys_attrs;
1381  MemoryContext oldcontext;
1382 
1383  /* Allocate workspace and zero all fields */
1384  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1385 
1386  /*
1387  * We allocate everything used by a cstate in a new memory context. This
1388  * avoids memory leaks during repeated use of COPY in a query.
1389  */
1391  "COPY",
1393 
1394  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1395 
1396  /* Extract options from the statement node tree */
1397  ProcessCopyOptions(pstate, cstate, is_from, options);
1398 
1399  /* Process the source/target relation or query */
1400  if (rel)
1401  {
1402  Assert(!raw_query);
1403 
1404  cstate->rel = rel;
1405 
1406  tupDesc = RelationGetDescr(cstate->rel);
1407 
1408  /* Don't allow COPY w/ OIDs to or from a table without them */
1409  if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1410  ereport(ERROR,
1411  (errcode(ERRCODE_UNDEFINED_COLUMN),
1412  errmsg("table \"%s\" does not have OIDs",
1413  RelationGetRelationName(cstate->rel))));
1414  }
1415  else
1416  {
1417  List *rewritten;
1418  Query *query;
1419  PlannedStmt *plan;
1420  DestReceiver *dest;
1421 
1422  Assert(!is_from);
1423  cstate->rel = NULL;
1424 
1425  /* Don't allow COPY w/ OIDs from a query */
1426  if (cstate->oids)
1427  ereport(ERROR,
1428  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1429  errmsg("COPY (query) WITH OIDS is not supported")));
1430 
1431  /*
1432  * Run parse analysis and rewrite. Note this also acquires sufficient
1433  * locks on the source table(s).
1434  *
1435  * Because the parser and planner tend to scribble on their input, we
1436  * make a preliminary copy of the source querytree. This prevents
1437  * problems in the case that the COPY is in a portal or plpgsql
1438  * function and is executed repeatedly. (See also the same hack in
1439  * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1440  */
1441  rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1442  pstate->p_sourcetext, NULL, 0,
1443  NULL);
1444 
1445  /* check that we got back something we can work with */
1446  if (rewritten == NIL)
1447  {
1448  ereport(ERROR,
1449  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1450  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1451  }
1452  else if (list_length(rewritten) > 1)
1453  {
1454  ListCell *lc;
1455 
1456  /* examine queries to determine which error message to issue */
1457  foreach(lc, rewritten)
1458  {
1459  Query *q = lfirst_node(Query, lc);
1460 
1462  ereport(ERROR,
1463  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1464  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1466  ereport(ERROR,
1467  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1468  errmsg("DO ALSO rules are not supported for the COPY")));
1469  }
1470 
1471  ereport(ERROR,
1472  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1474  }
1475 
1476  query = linitial_node(Query, rewritten);
1477 
1478  /* The grammar allows SELECT INTO, but we don't support that */
1479  if (query->utilityStmt != NULL &&
1481  ereport(ERROR,
1482  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1483  errmsg("COPY (SELECT INTO) is not supported")));
1484 
1485  Assert(query->utilityStmt == NULL);
1486 
1487  /*
1488  * Similarly the grammar doesn't enforce the presence of a RETURNING
1489  * clause, but this is required here.
1490  */
1491  if (query->commandType != CMD_SELECT &&
1492  query->returningList == NIL)
1493  {
1494  Assert(query->commandType == CMD_INSERT ||
1495  query->commandType == CMD_UPDATE ||
1496  query->commandType == CMD_DELETE);
1497 
1498  ereport(ERROR,
1499  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1500  errmsg("COPY query must have a RETURNING clause")));
1501  }
1502 
1503  /* plan the query */
1504  plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1505 
1506  /*
1507  * With row level security and a user using "COPY relation TO", we
1508  * have to convert the "COPY relation TO" to a query-based COPY (eg:
1509  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1510  * in any RLS clauses.
1511  *
1512  * When this happens, we are passed in the relid of the originally
1513  * found relation (which we have locked). As the planner will look up
1514  * the relation again, we double-check here to make sure it found the
1515  * same one that we have locked.
1516  */
1517  if (queryRelId != InvalidOid)
1518  {
1519  /*
1520  * Note that with RLS involved there may be multiple relations,
1521  * and while the one we need is almost certainly first, we don't
1522  * make any guarantees of that in the planner, so check the whole
1523  * list and make sure we find the original relation.
1524  */
1525  if (!list_member_oid(plan->relationOids, queryRelId))
1526  ereport(ERROR,
1527  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1528  errmsg("relation referenced by COPY statement has changed")));
1529  }
1530 
1531  /*
1532  * Use a snapshot with an updated command ID to ensure this query sees
1533  * results of any previously executed queries.
1534  */
1537 
1538  /* Create dest receiver for COPY OUT */
1540  ((DR_copy *) dest)->cstate = cstate;
1541 
1542  /* Create a QueryDesc requesting no output */
1543  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1546  dest, NULL, NULL, 0);
1547 
1548  /*
1549  * Call ExecutorStart to prepare the plan for execution.
1550  *
1551  * ExecutorStart computes a result tupdesc for us
1552  */
1553  ExecutorStart(cstate->queryDesc, 0);
1554 
1555  tupDesc = cstate->queryDesc->tupDesc;
1556  }
1557 
1558  /* Generate or convert list of attributes to process */
1559  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1560 
1561  num_phys_attrs = tupDesc->natts;
1562 
1563  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1564  cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1565  if (cstate->force_quote_all)
1566  {
1567  int i;
1568 
1569  for (i = 0; i < num_phys_attrs; i++)
1570  cstate->force_quote_flags[i] = true;
1571  }
1572  else if (cstate->force_quote)
1573  {
1574  List *attnums;
1575  ListCell *cur;
1576 
1577  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1578 
1579  foreach(cur, attnums)
1580  {
1581  int attnum = lfirst_int(cur);
1582  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1583 
1584  if (!list_member_int(cstate->attnumlist, attnum))
1585  ereport(ERROR,
1586  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1587  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1588  NameStr(attr->attname))));
1589  cstate->force_quote_flags[attnum - 1] = true;
1590  }
1591  }
1592 
1593  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1594  cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1595  if (cstate->force_notnull)
1596  {
1597  List *attnums;
1598  ListCell *cur;
1599 
1600  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1601 
1602  foreach(cur, attnums)
1603  {
1604  int attnum = lfirst_int(cur);
1605  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1606 
1607  if (!list_member_int(cstate->attnumlist, attnum))
1608  ereport(ERROR,
1609  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1610  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1611  NameStr(attr->attname))));
1612  cstate->force_notnull_flags[attnum - 1] = true;
1613  }
1614  }
1615 
1616  /* Convert FORCE_NULL name list to per-column flags, check validity */
1617  cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1618  if (cstate->force_null)
1619  {
1620  List *attnums;
1621  ListCell *cur;
1622 
1623  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1624 
1625  foreach(cur, attnums)
1626  {
1627  int attnum = lfirst_int(cur);
1628  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1629 
1630  if (!list_member_int(cstate->attnumlist, attnum))
1631  ereport(ERROR,
1632  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1633  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1634  NameStr(attr->attname))));
1635  cstate->force_null_flags[attnum - 1] = true;
1636  }
1637  }
1638 
1639  /* Convert convert_selectively name list to per-column flags */
1640  if (cstate->convert_selectively)
1641  {
1642  List *attnums;
1643  ListCell *cur;
1644 
1645  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1646 
1647  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1648 
1649  foreach(cur, attnums)
1650  {
1651  int attnum = lfirst_int(cur);
1652  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1653 
1654  if (!list_member_int(cstate->attnumlist, attnum))
1655  ereport(ERROR,
1656  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1657  errmsg_internal("selected column \"%s\" not referenced by COPY",
1658  NameStr(attr->attname))));
1659  cstate->convert_select_flags[attnum - 1] = true;
1660  }
1661  }
1662 
1663  /* Use client encoding when ENCODING option is not specified. */
1664  if (cstate->file_encoding < 0)
1666 
1667  /*
1668  * Set up encoding conversion info. Even if the file and server encodings
1669  * are the same, we must apply pg_any_to_server() to validate data in
1670  * multibyte encodings.
1671  */
1672  cstate->need_transcoding =
1673  (cstate->file_encoding != GetDatabaseEncoding() ||
1675  /* See Multibyte encoding comment above */
1677 
1678  cstate->copy_dest = COPY_FILE; /* default */
1679 
1680  MemoryContextSwitchTo(oldcontext);
1681 
1682  return cstate;
1683 }
1684 
1685 /*
1686  * Closes the pipe to an external program, checking the pclose() return code.
1687  */
1688 static void
1690 {
1691  int pclose_rc;
1692 
1693  Assert(cstate->is_program);
1694 
1695  pclose_rc = ClosePipeStream(cstate->copy_file);
1696  if (pclose_rc == -1)
1697  ereport(ERROR,
1699  errmsg("could not close pipe to external command: %m")));
1700  else if (pclose_rc != 0)
1701  ereport(ERROR,
1702  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1703  errmsg("program \"%s\" failed",
1704  cstate->filename),
1705  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1706 }
1707 
1708 /*
1709  * Release resources allocated in a cstate for COPY TO/FROM.
1710  */
1711 static void
1713 {
1714  if (cstate->is_program)
1715  {
1716  ClosePipeToProgram(cstate);
1717  }
1718  else
1719  {
1720  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1721  ereport(ERROR,
1723  errmsg("could not close file \"%s\": %m",
1724  cstate->filename)));
1725  }
1726 
1728  pfree(cstate);
1729 }
1730 
1731 /*
1732  * Setup CopyState to read tuples from a table or a query for COPY TO.
1733  */
1734 static CopyState
1736  Relation rel,
1737  RawStmt *query,
1738  Oid queryRelId,
1739  const char *filename,
1740  bool is_program,
1741  List *attnamelist,
1742  List *options)
1743 {
1744  CopyState cstate;
1745  bool pipe = (filename == NULL);
1746  MemoryContext oldcontext;
1747 
1748  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1749  {
1750  if (rel->rd_rel->relkind == RELKIND_VIEW)
1751  ereport(ERROR,
1752  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1753  errmsg("cannot copy from view \"%s\"",
1755  errhint("Try the COPY (SELECT ...) TO variant.")));
1756  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1757  ereport(ERROR,
1758  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1759  errmsg("cannot copy from materialized view \"%s\"",
1761  errhint("Try the COPY (SELECT ...) TO variant.")));
1762  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1763  ereport(ERROR,
1764  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1765  errmsg("cannot copy from foreign table \"%s\"",
1767  errhint("Try the COPY (SELECT ...) TO variant.")));
1768  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1769  ereport(ERROR,
1770  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1771  errmsg("cannot copy from sequence \"%s\"",
1772  RelationGetRelationName(rel))));
1773  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1774  ereport(ERROR,
1775  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1776  errmsg("cannot copy from partitioned table \"%s\"",
1778  errhint("Try the COPY (SELECT ...) TO variant.")));
1779  else
1780  ereport(ERROR,
1781  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1782  errmsg("cannot copy from non-table relation \"%s\"",
1783  RelationGetRelationName(rel))));
1784  }
1785 
1786  cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1787  options);
1788  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1789 
1790  if (pipe)
1791  {
1792  Assert(!is_program); /* the grammar does not allow this */
1794  cstate->copy_file = stdout;
1795  }
1796  else
1797  {
1798  cstate->filename = pstrdup(filename);
1799  cstate->is_program = is_program;
1800 
1801  if (is_program)
1802  {
1803  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1804  if (cstate->copy_file == NULL)
1805  ereport(ERROR,
1807  errmsg("could not execute command \"%s\": %m",
1808  cstate->filename)));
1809  }
1810  else
1811  {
1812  mode_t oumask; /* Pre-existing umask value */
1813  struct stat st;
1814 
1815  /*
1816  * Prevent write to relative path ... too easy to shoot oneself in
1817  * the foot by overwriting a database file ...
1818  */
1819  if (!is_absolute_path(filename))
1820  ereport(ERROR,
1821  (errcode(ERRCODE_INVALID_NAME),
1822  errmsg("relative path not allowed for COPY to file")));
1823 
1824  oumask = umask(S_IWGRP | S_IWOTH);
1825  PG_TRY();
1826  {
1827  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1828  }
1829  PG_CATCH();
1830  {
1831  umask(oumask);
1832  PG_RE_THROW();
1833  }
1834  PG_END_TRY();
1835  umask(oumask);
1836  if (cstate->copy_file == NULL)
1837  {
1838  /* copy errno because ereport subfunctions might change it */
1839  int save_errno = errno;
1840 
1841  ereport(ERROR,
1843  errmsg("could not open file \"%s\" for writing: %m",
1844  cstate->filename),
1845  (save_errno == ENOENT || save_errno == EACCES) ?
1846  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1847  "You may want a client-side facility such as psql's \\copy.") : 0));
1848  }
1849 
1850  if (fstat(fileno(cstate->copy_file), &st))
1851  ereport(ERROR,
1853  errmsg("could not stat file \"%s\": %m",
1854  cstate->filename)));
1855 
1856  if (S_ISDIR(st.st_mode))
1857  ereport(ERROR,
1858  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1859  errmsg("\"%s\" is a directory", cstate->filename)));
1860  }
1861  }
1862 
1863  MemoryContextSwitchTo(oldcontext);
1864 
1865  return cstate;
1866 }
1867 
1868 /*
1869  * This intermediate routine exists mainly to localize the effects of setjmp
1870  * so we don't need to plaster a lot of variables with "volatile".
1871  */
1872 static uint64
1874 {
1875  bool pipe = (cstate->filename == NULL);
1876  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1877  uint64 processed;
1878 
1879  PG_TRY();
1880  {
1881  if (fe_copy)
1882  SendCopyBegin(cstate);
1883 
1884  processed = CopyTo(cstate);
1885 
1886  if (fe_copy)
1887  SendCopyEnd(cstate);
1888  }
1889  PG_CATCH();
1890  {
1891  /*
1892  * Make sure we turn off old-style COPY OUT mode upon error. It is
1893  * okay to do this in all cases, since it does nothing if the mode is
1894  * not on.
1895  */
1896  pq_endcopyout(true);
1897  PG_RE_THROW();
1898  }
1899  PG_END_TRY();
1900 
1901  return processed;
1902 }
1903 
1904 /*
1905  * Clean up storage and release resources for COPY TO.
1906  */
1907 static void
1909 {
1910  if (cstate->queryDesc != NULL)
1911  {
1912  /* Close down the query and free resources. */
1913  ExecutorFinish(cstate->queryDesc);
1914  ExecutorEnd(cstate->queryDesc);
1915  FreeQueryDesc(cstate->queryDesc);
1917  }
1918 
1919  /* Clean up storage */
1920  EndCopy(cstate);
1921 }
1922 
1923 /*
1924  * Copy from relation or query TO file.
1925  */
1926 static uint64
1928 {
1929  TupleDesc tupDesc;
1930  int num_phys_attrs;
1931  ListCell *cur;
1932  uint64 processed;
1933 
1934  if (cstate->rel)
1935  tupDesc = RelationGetDescr(cstate->rel);
1936  else
1937  tupDesc = cstate->queryDesc->tupDesc;
1938  num_phys_attrs = tupDesc->natts;
1939  cstate->null_print_client = cstate->null_print; /* default */
1940 
1941  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1942  cstate->fe_msgbuf = makeStringInfo();
1943 
1944  /* Get info about the columns we need to process. */
1945  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1946  foreach(cur, cstate->attnumlist)
1947  {
1948  int attnum = lfirst_int(cur);
1949  Oid out_func_oid;
1950  bool isvarlena;
1951  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1952 
1953  if (cstate->binary)
1954  getTypeBinaryOutputInfo(attr->atttypid,
1955  &out_func_oid,
1956  &isvarlena);
1957  else
1958  getTypeOutputInfo(attr->atttypid,
1959  &out_func_oid,
1960  &isvarlena);
1961  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1962  }
1963 
1964  /*
1965  * Create a temporary memory context that we can reset once per row to
1966  * recover palloc'd memory. This avoids any problems with leaks inside
1967  * datatype output routines, and should be faster than retail pfree's
1968  * anyway. (We don't need a whole econtext as CopyFrom does.)
1969  */
1971  "COPY TO",
1973 
1974  if (cstate->binary)
1975  {
1976  /* Generate header for a binary copy */
1977  int32 tmp;
1978 
1979  /* Signature */
1980  CopySendData(cstate, BinarySignature, 11);
1981  /* Flags field */
1982  tmp = 0;
1983  if (cstate->oids)
1984  tmp |= (1 << 16);
1985  CopySendInt32(cstate, tmp);
1986  /* No header extension */
1987  tmp = 0;
1988  CopySendInt32(cstate, tmp);
1989  }
1990  else
1991  {
1992  /*
1993  * For non-binary copy, we need to convert null_print to file
1994  * encoding, because it will be sent directly with CopySendString.
1995  */
1996  if (cstate->need_transcoding)
1997  cstate->null_print_client = pg_server_to_any(cstate->null_print,
1998  cstate->null_print_len,
1999  cstate->file_encoding);
2000 
2001  /* if a header has been requested send the line */
2002  if (cstate->header_line)
2003  {
2004  bool hdr_delim = false;
2005 
2006  foreach(cur, cstate->attnumlist)
2007  {
2008  int attnum = lfirst_int(cur);
2009  char *colname;
2010 
2011  if (hdr_delim)
2012  CopySendChar(cstate, cstate->delim[0]);
2013  hdr_delim = true;
2014 
2015  colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2016 
2017  CopyAttributeOutCSV(cstate, colname, false,
2018  list_length(cstate->attnumlist) == 1);
2019  }
2020 
2021  CopySendEndOfRow(cstate);
2022  }
2023  }
2024 
2025  if (cstate->rel)
2026  {
2027  Datum *values;
2028  bool *nulls;
2029  HeapScanDesc scandesc;
2030  HeapTuple tuple;
2031 
2032  values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2033  nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2034 
2035  scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2036 
2037  processed = 0;
2038  while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2039  {
2041 
2042  /* Deconstruct the tuple ... faster than repeated heap_getattr */
2043  heap_deform_tuple(tuple, tupDesc, values, nulls);
2044 
2045  /* Format and send the data */
2046  CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2047  processed++;
2048  }
2049 
2050  heap_endscan(scandesc);
2051 
2052  pfree(values);
2053  pfree(nulls);
2054  }
2055  else
2056  {
2057  /* run the plan --- the dest receiver will send tuples */
2058  ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2059  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2060  }
2061 
2062  if (cstate->binary)
2063  {
2064  /* Generate trailer for a binary copy */
2065  CopySendInt16(cstate, -1);
2066  /* Need to flush out the trailer */
2067  CopySendEndOfRow(cstate);
2068  }
2069 
2071 
2072  return processed;
2073 }
2074 
2075 /*
2076  * Emit one row during CopyTo().
2077  */
2078 static void
2079 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2080 {
2081  bool need_delim = false;
2083  MemoryContext oldcontext;
2084  ListCell *cur;
2085  char *string;
2086 
2087  MemoryContextReset(cstate->rowcontext);
2088  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2089 
2090  if (cstate->binary)
2091  {
2092  /* Binary per-tuple header */
2093  CopySendInt16(cstate, list_length(cstate->attnumlist));
2094  /* Send OID if wanted --- note attnumlist doesn't include it */
2095  if (cstate->oids)
2096  {
2097  /* Hack --- assume Oid is same size as int32 */
2098  CopySendInt32(cstate, sizeof(int32));
2099  CopySendInt32(cstate, tupleOid);
2100  }
2101  }
2102  else
2103  {
2104  /* Text format has no per-tuple header, but send OID if wanted */
2105  /* Assume digits don't need any quoting or encoding conversion */
2106  if (cstate->oids)
2107  {
2109  ObjectIdGetDatum(tupleOid)));
2110  CopySendString(cstate, string);
2111  need_delim = true;
2112  }
2113  }
2114 
2115  foreach(cur, cstate->attnumlist)
2116  {
2117  int attnum = lfirst_int(cur);
2118  Datum value = values[attnum - 1];
2119  bool isnull = nulls[attnum - 1];
2120 
2121  if (!cstate->binary)
2122  {
2123  if (need_delim)
2124  CopySendChar(cstate, cstate->delim[0]);
2125  need_delim = true;
2126  }
2127 
2128  if (isnull)
2129  {
2130  if (!cstate->binary)
2131  CopySendString(cstate, cstate->null_print_client);
2132  else
2133  CopySendInt32(cstate, -1);
2134  }
2135  else
2136  {
2137  if (!cstate->binary)
2138  {
2139  string = OutputFunctionCall(&out_functions[attnum - 1],
2140  value);
2141  if (cstate->csv_mode)
2142  CopyAttributeOutCSV(cstate, string,
2143  cstate->force_quote_flags[attnum - 1],
2144  list_length(cstate->attnumlist) == 1);
2145  else
2146  CopyAttributeOutText(cstate, string);
2147  }
2148  else
2149  {
2150  bytea *outputbytes;
2151 
2152  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2153  value);
2154  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2155  CopySendData(cstate, VARDATA(outputbytes),
2156  VARSIZE(outputbytes) - VARHDRSZ);
2157  }
2158  }
2159  }
2160 
2161  CopySendEndOfRow(cstate);
2162 
2163  MemoryContextSwitchTo(oldcontext);
2164 }
2165 
2166 
2167 /*
2168  * error context callback for COPY FROM
2169  *
2170  * The argument for the error context must be CopyState.
2171  */
2172 void
2174 {
2175  CopyState cstate = (CopyState) arg;
2176 
2177  if (cstate->binary)
2178  {
2179  /* can't usefully display the data */
2180  if (cstate->cur_attname)
2181  errcontext("COPY %s, line %d, column %s",
2182  cstate->cur_relname, cstate->cur_lineno,
2183  cstate->cur_attname);
2184  else
2185  errcontext("COPY %s, line %d",
2186  cstate->cur_relname, cstate->cur_lineno);
2187  }
2188  else
2189  {
2190  if (cstate->cur_attname && cstate->cur_attval)
2191  {
2192  /* error is relevant to a particular column */
2193  char *attval;
2194 
2195  attval = limit_printout_length(cstate->cur_attval);
2196  errcontext("COPY %s, line %d, column %s: \"%s\"",
2197  cstate->cur_relname, cstate->cur_lineno,
2198  cstate->cur_attname, attval);
2199  pfree(attval);
2200  }
2201  else if (cstate->cur_attname)
2202  {
2203  /* error is relevant to a particular column, value is NULL */
2204  errcontext("COPY %s, line %d, column %s: null input",
2205  cstate->cur_relname, cstate->cur_lineno,
2206  cstate->cur_attname);
2207  }
2208  else
2209  {
2210  /*
2211  * Error is relevant to a particular line.
2212  *
2213  * If line_buf still contains the correct line, and it's already
2214  * transcoded, print it. If it's still in a foreign encoding, it's
2215  * quite likely that the error is precisely a failure to do
2216  * encoding conversion (ie, bad data). We dare not try to convert
2217  * it, and at present there's no way to regurgitate it without
2218  * conversion. So we have to punt and just report the line number.
2219  */
2220  if (cstate->line_buf_valid &&
2221  (cstate->line_buf_converted || !cstate->need_transcoding))
2222  {
2223  char *lineval;
2224 
2225  lineval = limit_printout_length(cstate->line_buf.data);
2226  errcontext("COPY %s, line %d: \"%s\"",
2227  cstate->cur_relname, cstate->cur_lineno, lineval);
2228  pfree(lineval);
2229  }
2230  else
2231  {
2232  errcontext("COPY %s, line %d",
2233  cstate->cur_relname, cstate->cur_lineno);
2234  }
2235  }
2236  }
2237 }
2238 
2239 /*
2240  * Make sure we don't print an unreasonable amount of COPY data in a message.
2241  *
2242  * It would seem a lot easier to just use the sprintf "precision" limit to
2243  * truncate the string. However, some versions of glibc have a bug/misfeature
2244  * that vsnprintf will always fail (return -1) if it is asked to truncate
2245  * a string that contains invalid byte sequences for the current encoding.
2246  * So, do our own truncation. We return a pstrdup'd copy of the input.
2247  */
2248 static char *
2250 {
2251 #define MAX_COPY_DATA_DISPLAY 100
2252 
2253  int slen = strlen(str);
2254  int len;
2255  char *res;
2256 
2257  /* Fast path if definitely okay */
2258  if (slen <= MAX_COPY_DATA_DISPLAY)
2259  return pstrdup(str);
2260 
2261  /* Apply encoding-dependent truncation */
2262  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2263 
2264  /*
2265  * Truncate, and add "..." to show we truncated the input.
2266  */
2267  res = (char *) palloc(len + 4);
2268  memcpy(res, str, len);
2269  strcpy(res + len, "...");
2270 
2271  return res;
2272 }
2273 
2274 /*
2275  * Copy FROM file to relation.
2276  */
2277 uint64
2279 {
2280  HeapTuple tuple;
2281  TupleDesc tupDesc;
2282  Datum *values;
2283  bool *nulls;
2284  ResultRelInfo *resultRelInfo;
2285  ResultRelInfo *saved_resultRelInfo = NULL;
2286  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2287  ExprContext *econtext;
2288  TupleTableSlot *myslot;
2289  MemoryContext oldcontext = CurrentMemoryContext;
2290 
2291  ErrorContextCallback errcallback;
2292  CommandId mycid = GetCurrentCommandId(true);
2293  int hi_options = 0; /* start with default heap_insert options */
2294  BulkInsertState bistate;
2295  uint64 processed = 0;
2296  bool useHeapMultiInsert;
2297  int nBufferedTuples = 0;
2298  int prev_leaf_part_index = -1;
2299 
2300 #define MAX_BUFFERED_TUPLES 1000
2301  HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2302  Size bufferedTuplesSize = 0;
2303  int firstBufferedLineNo = 0;
2304 
2305  Assert(cstate->rel);
2306 
2307  /*
2308  * The target must be a plain relation or have an INSTEAD OF INSERT row
2309  * trigger. (Currently, such triggers are only allowed on views, so we
2310  * only hint about them in the view case.)
2311  */
2312  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2313  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2314  !(cstate->rel->trigdesc &&
2316  {
2317  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2318  ereport(ERROR,
2319  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2320  errmsg("cannot copy to view \"%s\"",
2321  RelationGetRelationName(cstate->rel)),
2322  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2323  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2324  ereport(ERROR,
2325  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2326  errmsg("cannot copy to materialized view \"%s\"",
2327  RelationGetRelationName(cstate->rel))));
2328  else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2329  ereport(ERROR,
2330  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2331  errmsg("cannot copy to foreign table \"%s\"",
2332  RelationGetRelationName(cstate->rel))));
2333  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2334  ereport(ERROR,
2335  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2336  errmsg("cannot copy to sequence \"%s\"",
2337  RelationGetRelationName(cstate->rel))));
2338  else
2339  ereport(ERROR,
2340  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2341  errmsg("cannot copy to non-table relation \"%s\"",
2342  RelationGetRelationName(cstate->rel))));
2343  }
2344 
2345  tupDesc = RelationGetDescr(cstate->rel);
2346 
2347  /*----------
2348  * Check to see if we can avoid writing WAL
2349  *
2350  * If archive logging/streaming is not enabled *and* either
2351  * - table was created in same transaction as this COPY
2352  * - data is being written to relfilenode created in this transaction
2353  * then we can skip writing WAL. It's safe because if the transaction
2354  * doesn't commit, we'll discard the table (or the new relfilenode file).
2355  * If it does commit, we'll have done the heap_sync at the bottom of this
2356  * routine first.
2357  *
2358  * As mentioned in comments in utils/rel.h, the in-same-transaction test
2359  * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2360  * can be cleared before the end of the transaction. The exact case is
2361  * when a relation sets a new relfilenode twice in same transaction, yet
2362  * the second one fails in an aborted subtransaction, e.g.
2363  *
2364  * BEGIN;
2365  * TRUNCATE t;
2366  * SAVEPOINT save;
2367  * TRUNCATE t;
2368  * ROLLBACK TO save;
2369  * COPY ...
2370  *
2371  * Also, if the target file is new-in-transaction, we assume that checking
2372  * FSM for free space is a waste of time, even if we must use WAL because
2373  * of archiving. This could possibly be wrong, but it's unlikely.
2374  *
2375  * The comments for heap_insert and RelationGetBufferForTuple specify that
2376  * skipping WAL logging is only safe if we ensure that our tuples do not
2377  * go into pages containing tuples from any other transactions --- but this
2378  * must be the case if we have a new table or new relfilenode, so we need
2379  * no additional work to enforce that.
2380  *----------
2381  */
2382  /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2383  if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2385  {
2386  hi_options |= HEAP_INSERT_SKIP_FSM;
2387  if (!XLogIsNeeded())
2388  hi_options |= HEAP_INSERT_SKIP_WAL;
2389  }
2390 
2391  /*
2392  * Optimize if new relfilenode was created in this subxact or one of its
2393  * committed children and we won't see those rows later as part of an
2394  * earlier scan or command. The subxact test ensures that if this subxact
2395  * aborts then the frozen rows won't be visible after xact cleanup. Note
2396  * that the stronger test of exactly which subtransaction created it is
2397  * crucial for correctness of this optimization. The test for an earlier
2398  * scan or command tolerates false negatives. FREEZE causes other sessions
2399  * to see rows they would not see under MVCC, and a false negative merely
2400  * spreads that anomaly to the current session.
2401  */
2402  if (cstate->freeze)
2403  {
2404  /*
2405  * Tolerate one registration for the benefit of FirstXactSnapshot.
2406  * Scan-bearing queries generally create at least two registrations,
2407  * though relying on that is fragile, as is ignoring ActiveSnapshot.
2408  * Clear CatalogSnapshot to avoid counting its registration. We'll
2409  * still detect ongoing catalog scans, each of which separately
2410  * registers the snapshot it uses.
2411  */
2414  ereport(ERROR,
2415  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2416  errmsg("cannot perform FREEZE because of prior transaction activity")));
2417 
2418  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2420  ereport(ERROR,
2421  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2422  errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2423 
2424  hi_options |= HEAP_INSERT_FROZEN;
2425  }
2426 
2427  /*
2428  * We need a ResultRelInfo so we can use the regular executor's
2429  * index-entry-making machinery. (There used to be a huge amount of code
2430  * here that basically duplicated execUtils.c ...)
2431  */
2432  resultRelInfo = makeNode(ResultRelInfo);
2433  InitResultRelInfo(resultRelInfo,
2434  cstate->rel,
2435  1, /* dummy rangetable index */
2436  NULL,
2437  0);
2438 
2439  ExecOpenIndices(resultRelInfo, false);
2440 
2441  estate->es_result_relations = resultRelInfo;
2442  estate->es_num_result_relations = 1;
2443  estate->es_result_relation_info = resultRelInfo;
2444  estate->es_range_table = cstate->range_table;
2445 
2446  /* Set up a tuple slot too */
2447  myslot = ExecInitExtraTupleSlot(estate, tupDesc);
2448  /* Triggers might need a slot as well */
2449  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
2450 
2451  /* Prepare to catch AFTER triggers. */
2453 
2454  /*
2455  * If there are any triggers with transition tables on the named relation,
2456  * we need to be prepared to capture transition tuples.
2457  */
2458  cstate->transition_capture =
2460  RelationGetRelid(cstate->rel),
2461  CMD_INSERT);
2462 
2463  /*
2464  * If the named relation is a partitioned table, initialize state for
2465  * CopyFrom tuple routing.
2466  */
2467  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2468  {
2469  PartitionTupleRouting *proute;
2470 
2471  proute = cstate->partition_tuple_routing =
2472  ExecSetupPartitionTupleRouting(NULL, cstate->rel, 1, estate);
2473 
2474  /*
2475  * If we are capturing transition tuples, they may need to be
2476  * converted from partition format back to partitioned table format
2477  * (this is only ever necessary if a BEFORE trigger modifies the
2478  * tuple).
2479  */
2480  if (cstate->transition_capture != NULL)
2482  }
2483 
2484  /*
2485  * It's more efficient to prepare a bunch of tuples for insertion, and
2486  * insert them in one heap_multi_insert() call, than call heap_insert()
2487  * separately for every tuple. However, we can't do that if there are
2488  * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2489  * expressions. Such triggers or expressions might query the table we're
2490  * inserting to, and act differently if the tuples that have already been
2491  * processed and prepared for insertion are not there. We also can't do
2492  * it if the table is partitioned.
2493  */
2494  if ((resultRelInfo->ri_TrigDesc != NULL &&
2495  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2496  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2497  cstate->partition_tuple_routing != NULL ||
2498  cstate->volatile_defexprs)
2499  {
2500  useHeapMultiInsert = false;
2501  }
2502  else
2503  {
2504  useHeapMultiInsert = true;
2505  bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2506  }
2507 
2508  /*
2509  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2510  * should do this for COPY, since it's not really an "INSERT" statement as
2511  * such. However, executing these triggers maintains consistency with the
2512  * EACH ROW triggers that we already fire on COPY.
2513  */
2514  ExecBSInsertTriggers(estate, resultRelInfo);
2515 
2516  values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2517  nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2518 
2519  bistate = GetBulkInsertState();
2520  econtext = GetPerTupleExprContext(estate);
2521 
2522  /* Set up callback to identify error line number */
2523  errcallback.callback = CopyFromErrorCallback;
2524  errcallback.arg = (void *) cstate;
2525  errcallback.previous = error_context_stack;
2526  error_context_stack = &errcallback;
2527 
2528  for (;;)
2529  {
2530  TupleTableSlot *slot;
2531  bool skip_tuple;
2532  Oid loaded_oid = InvalidOid;
2533 
2535 
2536  if (nBufferedTuples == 0)
2537  {
2538  /*
2539  * Reset the per-tuple exprcontext. We can only do this if the
2540  * tuple buffer is empty. (Calling the context the per-tuple
2541  * memory context is a bit of a misnomer now.)
2542  */
2543  ResetPerTupleExprContext(estate);
2544  }
2545 
2546  /* Switch into its memory context */
2548 
2549  if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2550  break;
2551 
2552  /* And now we can form the input tuple. */
2553  tuple = heap_form_tuple(tupDesc, values, nulls);
2554 
2555  if (loaded_oid != InvalidOid)
2556  HeapTupleSetOid(tuple, loaded_oid);
2557 
2558  /*
2559  * Constraints might reference the tableoid column, so initialize
2560  * t_tableOid before evaluating them.
2561  */
2562  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2563 
2564  /* Triggers and stuff need to be invoked in query context. */
2565  MemoryContextSwitchTo(oldcontext);
2566 
2567  /* Place tuple in tuple slot --- but slot shouldn't free it */
2568  slot = myslot;
2569  ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2570 
2571  /* Determine the partition to heap_insert the tuple into */
2572  if (cstate->partition_tuple_routing)
2573  {
2574  int leaf_part_index;
2576 
2577  /*
2578  * Away we go ... If we end up not finding a partition after all,
2579  * ExecFindPartition() does not return and errors out instead.
2580  * Otherwise, the returned value is to be used as an index into
2581  * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2582  * will get us the ResultRelInfo and TupleConversionMap for the
2583  * partition, respectively.
2584  */
2585  leaf_part_index = ExecFindPartition(resultRelInfo,
2586  proute->partition_dispatch_info,
2587  slot,
2588  estate);
2589  Assert(leaf_part_index >= 0 &&
2590  leaf_part_index < proute->num_partitions);
2591 
2592  /*
2593  * If this tuple is mapped to a partition that is not same as the
2594  * previous one, we'd better make the bulk insert mechanism gets a
2595  * new buffer.
2596  */
2597  if (prev_leaf_part_index != leaf_part_index)
2598  {
2599  ReleaseBulkInsertStatePin(bistate);
2600  prev_leaf_part_index = leaf_part_index;
2601  }
2602 
2603  /*
2604  * Save the old ResultRelInfo and switch to the one corresponding
2605  * to the selected partition.
2606  */
2607  saved_resultRelInfo = resultRelInfo;
2608  resultRelInfo = proute->partitions[leaf_part_index];
2609 
2610  /* We do not yet have a way to insert into a foreign partition */
2611  if (resultRelInfo->ri_FdwRoutine)
2612  ereport(ERROR,
2613  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2614  errmsg("cannot route inserted tuples to a foreign table")));
2615 
2616  /*
2617  * For ExecInsertIndexTuples() to work on the partition's indexes
2618  */
2619  estate->es_result_relation_info = resultRelInfo;
2620 
2621  /*
2622  * If we're capturing transition tuples, we might need to convert
2623  * from the partition rowtype to parent rowtype.
2624  */
2625  if (cstate->transition_capture != NULL)
2626  {
2627  if (resultRelInfo->ri_TrigDesc &&
2628  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2629  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2630  {
2631  /*
2632  * If there are any BEFORE or INSTEAD triggers on the
2633  * partition, we'll have to be ready to convert their
2634  * result back to tuplestore format.
2635  */
2637  cstate->transition_capture->tcs_map =
2638  TupConvMapForLeaf(proute, saved_resultRelInfo,
2639  leaf_part_index);
2640  }
2641  else
2642  {
2643  /*
2644  * Otherwise, just remember the original unconverted
2645  * tuple, to avoid a needless round trip conversion.
2646  */
2648  cstate->transition_capture->tcs_map = NULL;
2649  }
2650  }
2651 
2652  /*
2653  * We might need to convert from the parent rowtype to the
2654  * partition rowtype.
2655  */
2656  tuple = ConvertPartitionTupleSlot(proute->parent_child_tupconv_maps[leaf_part_index],
2657  tuple,
2658  proute->partition_tuple_slot,
2659  &slot);
2660 
2661  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2662  }
2663 
2664  skip_tuple = false;
2665 
2666  /* BEFORE ROW INSERT Triggers */
2667  if (resultRelInfo->ri_TrigDesc &&
2668  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2669  {
2670  slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2671 
2672  if (slot == NULL) /* "do nothing" */
2673  skip_tuple = true;
2674  else /* trigger might have changed tuple */
2675  tuple = ExecMaterializeSlot(slot);
2676  }
2677 
2678  if (!skip_tuple)
2679  {
2680  if (resultRelInfo->ri_TrigDesc &&
2681  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2682  {
2683  /* Pass the data to the INSTEAD ROW INSERT trigger */
2684  ExecIRInsertTriggers(estate, resultRelInfo, slot);
2685  }
2686  else
2687  {
2688  /*
2689  * We always check the partition constraint, including when
2690  * the tuple got here via tuple-routing. However we don't
2691  * need to in the latter case if no BR trigger is defined on
2692  * the partition. Note that a BR trigger might modify the
2693  * tuple such that the partition constraint is no longer
2694  * satisfied, so we need to check in that case.
2695  */
2696  bool check_partition_constr =
2697  (resultRelInfo->ri_PartitionCheck != NIL);
2698 
2699  if (saved_resultRelInfo != NULL &&
2700  !(resultRelInfo->ri_TrigDesc &&
2701  resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2702  check_partition_constr = false;
2703 
2704  /* Check the constraints of the tuple */
2705  if (cstate->rel->rd_att->constr || check_partition_constr)
2706  ExecConstraints(resultRelInfo, slot, estate, true);
2707 
2708  if (useHeapMultiInsert)
2709  {
2710  /* Add this tuple to the tuple buffer */
2711  if (nBufferedTuples == 0)
2712  firstBufferedLineNo = cstate->cur_lineno;
2713  bufferedTuples[nBufferedTuples++] = tuple;
2714  bufferedTuplesSize += tuple->t_len;
2715 
2716  /*
2717  * If the buffer filled up, flush it. Also flush if the
2718  * total size of all the tuples in the buffer becomes
2719  * large, to avoid using large amounts of memory for the
2720  * buffer when the tuples are exceptionally wide.
2721  */
2722  if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2723  bufferedTuplesSize > 65535)
2724  {
2725  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2726  resultRelInfo, myslot, bistate,
2727  nBufferedTuples, bufferedTuples,
2728  firstBufferedLineNo);
2729  nBufferedTuples = 0;
2730  bufferedTuplesSize = 0;
2731  }
2732  }
2733  else
2734  {
2735  List *recheckIndexes = NIL;
2736 
2737  /* OK, store the tuple and create index entries for it */
2738  heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2739  hi_options, bistate);
2740 
2741  if (resultRelInfo->ri_NumIndices > 0)
2742  recheckIndexes = ExecInsertIndexTuples(slot,
2743  &(tuple->t_self),
2744  estate,
2745  false,
2746  NULL,
2747  NIL);
2748 
2749  /* AFTER ROW INSERT Triggers */
2750  ExecARInsertTriggers(estate, resultRelInfo, tuple,
2751  recheckIndexes, cstate->transition_capture);
2752 
2753  list_free(recheckIndexes);
2754  }
2755  }
2756 
2757  /*
2758  * We count only tuples not suppressed by a BEFORE INSERT trigger;
2759  * this is the same definition used by execMain.c for counting
2760  * tuples inserted by an INSERT command.
2761  */
2762  processed++;
2763 
2764  if (saved_resultRelInfo)
2765  {
2766  resultRelInfo = saved_resultRelInfo;
2767  estate->es_result_relation_info = resultRelInfo;
2768  }
2769  }
2770  }
2771 
2772  /* Flush any remaining buffered tuples */
2773  if (nBufferedTuples > 0)
2774  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2775  resultRelInfo, myslot, bistate,
2776  nBufferedTuples, bufferedTuples,
2777  firstBufferedLineNo);
2778 
2779  /* Done, clean up */
2780  error_context_stack = errcallback.previous;
2781 
2782  FreeBulkInsertState(bistate);
2783 
2784  MemoryContextSwitchTo(oldcontext);
2785 
2786  /*
2787  * In the old protocol, tell pqcomm that we can process normal protocol
2788  * messages again.
2789  */
2790  if (cstate->copy_dest == COPY_OLD_FE)
2791  pq_endmsgread();
2792 
2793  /* Execute AFTER STATEMENT insertion triggers */
2794  ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2795 
2796  /* Handle queued AFTER triggers */
2797  AfterTriggerEndQuery(estate);
2798 
2799  pfree(values);
2800  pfree(nulls);
2801 
2802  ExecResetTupleTable(estate->es_tupleTable, false);
2803 
2804  ExecCloseIndices(resultRelInfo);
2805 
2806  /* Close all the partitioned tables, leaf partitions, and their indices */
2807  if (cstate->partition_tuple_routing)
2809 
2810  /* Close any trigger target relations */
2811  ExecCleanUpTriggerState(estate);
2812 
2813  FreeExecutorState(estate);
2814 
2815  /*
2816  * If we skipped writing WAL, then we need to sync the heap (but not
2817  * indexes since those use WAL anyway)
2818  */
2819  if (hi_options & HEAP_INSERT_SKIP_WAL)
2820  heap_sync(cstate->rel);
2821 
2822  return processed;
2823 }
2824 
2825 /*
2826  * A subroutine of CopyFrom, to write the current batch of buffered heap
2827  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2828  * triggers.
2829  */
2830 static void
2832  int hi_options, ResultRelInfo *resultRelInfo,
2833  TupleTableSlot *myslot, BulkInsertState bistate,
2834  int nBufferedTuples, HeapTuple *bufferedTuples,
2835  int firstBufferedLineNo)
2836 {
2837  MemoryContext oldcontext;
2838  int i;
2839  int save_cur_lineno;
2840 
2841  /*
2842  * Print error context information correctly, if one of the operations
2843  * below fail.
2844  */
2845  cstate->line_buf_valid = false;
2846  save_cur_lineno = cstate->cur_lineno;
2847 
2848  /*
2849  * heap_multi_insert leaks memory, so switch to short-lived memory context
2850  * before calling it.
2851  */
2852  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2853  heap_multi_insert(cstate->rel,
2854  bufferedTuples,
2855  nBufferedTuples,
2856  mycid,
2857  hi_options,
2858  bistate);
2859  MemoryContextSwitchTo(oldcontext);
2860 
2861  /*
2862  * If there are any indexes, update them for all the inserted tuples, and
2863  * run AFTER ROW INSERT triggers.
2864  */
2865  if (resultRelInfo->ri_NumIndices > 0)
2866  {
2867  for (i = 0; i < nBufferedTuples; i++)
2868  {
2869  List *recheckIndexes;
2870 
2871  cstate->cur_lineno = firstBufferedLineNo + i;
2872  ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2873  recheckIndexes =
2874  ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2875  estate, false, NULL, NIL);
2876  ExecARInsertTriggers(estate, resultRelInfo,
2877  bufferedTuples[i],
2878  recheckIndexes, cstate->transition_capture);
2879  list_free(recheckIndexes);
2880  }
2881  }
2882 
2883  /*
2884  * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2885  * anyway.
2886  */
2887  else if (resultRelInfo->ri_TrigDesc != NULL &&
2888  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2889  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2890  {
2891  for (i = 0; i < nBufferedTuples; i++)
2892  {
2893  cstate->cur_lineno = firstBufferedLineNo + i;
2894  ExecARInsertTriggers(estate, resultRelInfo,
2895  bufferedTuples[i],
2896  NIL, cstate->transition_capture);
2897  }
2898  }
2899 
2900  /* reset cur_lineno to where we were */
2901  cstate->cur_lineno = save_cur_lineno;
2902 }
2903 
2904 /*
2905  * Setup to read tuples from a file for COPY FROM.
2906  *
2907  * 'rel': Used as a template for the tuples
2908  * 'filename': Name of server-local file to read
2909  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2910  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2911  *
2912  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2913  */
2914 CopyState
2916  Relation rel,
2917  const char *filename,
2918  bool is_program,
2920  List *attnamelist,
2921  List *options)
2922 {
2923  CopyState cstate;
2924  bool pipe = (filename == NULL);
2925  TupleDesc tupDesc;
2926  AttrNumber num_phys_attrs,
2927  num_defaults;
2929  Oid *typioparams;
2930  int attnum;
2931  Oid in_func_oid;
2932  int *defmap;
2933  ExprState **defexprs;
2934  MemoryContext oldcontext;
2935  bool volatile_defexprs;
2936 
2937  cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2938  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2939 
2940  /* Initialize state variables */
2941  cstate->fe_eof = false;
2942  cstate->eol_type = EOL_UNKNOWN;
2943  cstate->cur_relname = RelationGetRelationName(cstate->rel);
2944  cstate->cur_lineno = 0;
2945  cstate->cur_attname = NULL;
2946  cstate->cur_attval = NULL;
2947 
2948  /* Set up variables to avoid per-attribute overhead. */
2949  initStringInfo(&cstate->attribute_buf);
2950  initStringInfo(&cstate->line_buf);
2951  cstate->line_buf_converted = false;
2952  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2953  cstate->raw_buf_index = cstate->raw_buf_len = 0;
2954 
2955  /* Assign range table, we'll need it in CopyFrom. */
2956  if (pstate)
2957  cstate->range_table = pstate->p_rtable;
2958 
2959  tupDesc = RelationGetDescr(cstate->rel);
2960  num_phys_attrs = tupDesc->natts;
2961  num_defaults = 0;
2962  volatile_defexprs = false;
2963 
2964  /*
2965  * Pick up the required catalog information for each attribute in the
2966  * relation, including the input function, the element type (to pass to
2967  * the input function), and info about defaults and constraints. (Which
2968  * input function we use depends on text/binary format choice.)
2969  */
2970  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2971  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2972  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2973  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2974 
2975  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2976  {
2977  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
2978 
2979  /* We don't need info for dropped attributes */
2980  if (att->attisdropped)
2981  continue;
2982 
2983  /* Fetch the input function and typioparam info */
2984  if (cstate->binary)
2985  getTypeBinaryInputInfo(att->atttypid,
2986  &in_func_oid, &typioparams[attnum - 1]);
2987  else
2988  getTypeInputInfo(att->atttypid,
2989  &in_func_oid, &typioparams[attnum - 1]);
2990  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2991 
2992  /* Get default info if needed */
2993  if (!list_member_int(cstate->attnumlist, attnum))
2994  {
2995  /* attribute is NOT to be copied from input */
2996  /* use default value if one exists */
2997  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
2998  attnum);
2999 
3000  if (defexpr != NULL)
3001  {
3002  /* Run the expression through planner */
3003  defexpr = expression_planner(defexpr);
3004 
3005  /* Initialize executable expression in copycontext */
3006  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3007  defmap[num_defaults] = attnum - 1;
3008  num_defaults++;
3009 
3010  /*
3011  * If a default expression looks at the table being loaded,
3012  * then it could give the wrong answer when using
3013  * multi-insert. Since database access can be dynamic this is
3014  * hard to test for exactly, so we use the much wider test of
3015  * whether the default expression is volatile. We allow for
3016  * the special case of when the default expression is the
3017  * nextval() of a sequence which in this specific case is
3018  * known to be safe for use with the multi-insert
3019  * optimization. Hence we use this special case function
3020  * checker rather than the standard check for
3021  * contain_volatile_functions().
3022  */
3023  if (!volatile_defexprs)
3024  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3025  }
3026  }
3027  }
3028 
3029  /* We keep those variables in cstate. */
3030  cstate->in_functions = in_functions;
3031  cstate->typioparams = typioparams;
3032  cstate->defmap = defmap;
3033  cstate->defexprs = defexprs;
3035  cstate->num_defaults = num_defaults;
3036  cstate->is_program = is_program;
3037 
3038  if (data_source_cb)
3039  {
3040  cstate->copy_dest = COPY_CALLBACK;
3041  cstate->data_source_cb = data_source_cb;
3042  }
3043  else if (pipe)
3044  {
3045  Assert(!is_program); /* the grammar does not allow this */
3047  ReceiveCopyBegin(cstate);
3048  else
3049  cstate->copy_file = stdin;
3050  }
3051  else
3052  {
3053  cstate->filename = pstrdup(filename);
3054 
3055  if (cstate->is_program)
3056  {
3057  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3058  if (cstate->copy_file == NULL)
3059  ereport(ERROR,
3061  errmsg("could not execute command \"%s\": %m",
3062  cstate->filename)));
3063  }
3064  else
3065  {
3066  struct stat st;
3067 
3068  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3069  if (cstate->copy_file == NULL)
3070  {
3071  /* copy errno because ereport subfunctions might change it */
3072  int save_errno = errno;
3073 
3074  ereport(ERROR,
3076  errmsg("could not open file \"%s\" for reading: %m",
3077  cstate->filename),
3078  (save_errno == ENOENT || save_errno == EACCES) ?
3079  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3080  "You may want a client-side facility such as psql's \\copy.") : 0));
3081  }
3082 
3083  if (fstat(fileno(cstate->copy_file), &st))
3084  ereport(ERROR,
3086  errmsg("could not stat file \"%s\": %m",
3087  cstate->filename)));
3088 
3089  if (S_ISDIR(st.st_mode))
3090  ereport(ERROR,
3091  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3092  errmsg("\"%s\" is a directory", cstate->filename)));
3093  }
3094  }
3095 
3096  if (!cstate->binary)
3097  {
3098  /* must rely on user to tell us... */
3099  cstate->file_has_oids = cstate->oids;
3100  }
3101  else
3102  {
3103  /* Read and verify binary header */
3104  char readSig[11];
3105  int32 tmp;
3106 
3107  /* Signature */
3108  if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3109  memcmp(readSig, BinarySignature, 11) != 0)
3110  ereport(ERROR,
3111  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3112  errmsg("COPY file signature not recognized")));
3113  /* Flags field */
3114  if (!CopyGetInt32(cstate, &tmp))
3115  ereport(ERROR,
3116  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3117  errmsg("invalid COPY file header (missing flags)")));
3118  cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3119  tmp &= ~(1 << 16);
3120  if ((tmp >> 16) != 0)
3121  ereport(ERROR,
3122  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3123  errmsg("unrecognized critical flags in COPY file header")));
3124  /* Header extension length */
3125  if (!CopyGetInt32(cstate, &tmp) ||
3126  tmp < 0)
3127  ereport(ERROR,
3128  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3129  errmsg("invalid COPY file header (missing length)")));
3130  /* Skip extension header, if present */
3131  while (tmp-- > 0)
3132  {
3133  if (CopyGetData(cstate, readSig, 1, 1) != 1)
3134  ereport(ERROR,
3135  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3136  errmsg("invalid COPY file header (wrong length)")));
3137  }
3138  }
3139 
3140  if (cstate->file_has_oids && cstate->binary)
3141  {
3143  &in_func_oid, &cstate->oid_typioparam);
3144  fmgr_info(in_func_oid, &cstate->oid_in_function);
3145  }
3146 
3147  /* create workspace for CopyReadAttributes results */
3148  if (!cstate->binary)
3149  {
3150  AttrNumber attr_count = list_length(cstate->attnumlist);
3151  int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3152 
3153  cstate->max_fields = nfields;
3154  cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3155  }
3156 
3157  MemoryContextSwitchTo(oldcontext);
3158 
3159  return cstate;
3160 }
3161 
3162 /*
3163  * Read raw fields in the next line for COPY FROM in text or csv mode.
3164  * Return false if no more lines.
3165  *
3166  * An internal temporary buffer is returned via 'fields'. It is valid until
3167  * the next call of the function. Since the function returns all raw fields
3168  * in the input file, 'nfields' could be different from the number of columns
3169  * in the relation.
3170  *
3171  * NOTE: force_not_null option are not applied to the returned fields.
3172  */
3173 bool
3174 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3175 {
3176  int fldct;
3177  bool done;
3178 
3179  /* only available for text or csv input */
3180  Assert(!cstate->binary);
3181 
3182  /* on input just throw the header line away */
3183  if (cstate->cur_lineno == 0 && cstate->header_line)
3184  {
3185  cstate->cur_lineno++;
3186  if (CopyReadLine(cstate))
3187  return false; /* done */
3188  }
3189 
3190  cstate->cur_lineno++;
3191 
3192  /* Actually read the line into memory here */
3193  done = CopyReadLine(cstate);
3194 
3195  /*
3196  * EOF at start of line means we're done. If we see EOF after some
3197  * characters, we act as though it was newline followed by EOF, ie,
3198  * process the line and then exit loop on next iteration.
3199  */
3200  if (done && cstate->line_buf.len == 0)
3201  return false;
3202 
3203  /* Parse the line into de-escaped field values */
3204  if (cstate->csv_mode)
3205  fldct = CopyReadAttributesCSV(cstate);
3206  else
3207  fldct = CopyReadAttributesText(cstate);
3208 
3209  *fields = cstate->raw_fields;
3210  *nfields = fldct;
3211  return true;
3212 }
3213 
3214 /*
3215  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3216  *
3217  * 'econtext' is used to evaluate default expression for each columns not
3218  * read from the file. It can be NULL when no default values are used, i.e.
3219  * when all columns are read from the file.
3220  *
3221  * 'values' and 'nulls' arrays must be the same length as columns of the
3222  * relation passed to BeginCopyFrom. This function fills the arrays.
3223  * Oid of the tuple is returned with 'tupleOid' separately.
3224  */
3225 bool
3227  Datum *values, bool *nulls, Oid *tupleOid)
3228 {
3229  TupleDesc tupDesc;
3230  AttrNumber num_phys_attrs,
3231  attr_count,
3232  num_defaults = cstate->num_defaults;
3233  FmgrInfo *in_functions = cstate->in_functions;
3234  Oid *typioparams = cstate->typioparams;
3235  int i;
3236  int nfields;
3237  bool isnull;
3238  bool file_has_oids = cstate->file_has_oids;
3239  int *defmap = cstate->defmap;
3240  ExprState **defexprs = cstate->defexprs;
3241 
3242  tupDesc = RelationGetDescr(cstate->rel);
3243  num_phys_attrs = tupDesc->natts;
3244  attr_count = list_length(cstate->attnumlist);
3245  nfields = file_has_oids ? (attr_count + 1) : attr_count;
3246 
3247  /* Initialize all values for row to NULL */
3248  MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3249  MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3250 
3251  if (!cstate->binary)
3252  {
3253  char **field_strings;
3254  ListCell *cur;
3255  int fldct;
3256  int fieldno;
3257  char *string;
3258 
3259  /* read raw fields in the next line */
3260  if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3261  return false;
3262 
3263  /* check for overflowing fields */
3264  if (nfields > 0 && fldct > nfields)
3265  ereport(ERROR,
3266  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3267  errmsg("extra data after last expected column")));
3268 
3269  fieldno = 0;
3270 
3271  /* Read the OID field if present */
3272  if (file_has_oids)
3273  {
3274  if (fieldno >= fldct)
3275  ereport(ERROR,
3276  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3277  errmsg("missing data for OID column")));
3278  string = field_strings[fieldno++];
3279 
3280  if (string == NULL)
3281  ereport(ERROR,
3282  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3283  errmsg("null OID in COPY data")));
3284  else if (cstate->oids && tupleOid != NULL)
3285  {
3286  cstate->cur_attname = "oid";
3287  cstate->cur_attval = string;
3289  CStringGetDatum(string)));
3290  if (*tupleOid == InvalidOid)
3291  ereport(ERROR,
3292  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3293  errmsg("invalid OID in COPY data")));
3294  cstate->cur_attname = NULL;
3295  cstate->cur_attval = NULL;
3296  }
3297  }
3298 
3299  /* Loop to read the user attributes on the line. */
3300  foreach(cur, cstate->attnumlist)
3301  {
3302  int attnum = lfirst_int(cur);
3303  int m = attnum - 1;
3304  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3305 
3306  if (fieldno >= fldct)
3307  ereport(ERROR,
3308  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3309  errmsg("missing data for column \"%s\"",
3310  NameStr(att->attname))));
3311  string = field_strings[fieldno++];
3312 
3313  if (cstate->convert_select_flags &&
3314  !cstate->convert_select_flags[m])
3315  {
3316  /* ignore input field, leaving column as NULL */
3317  continue;
3318  }
3319 
3320  if (cstate->csv_mode)
3321  {
3322  if (string == NULL &&
3323  cstate->force_notnull_flags[m])
3324  {
3325  /*
3326  * FORCE_NOT_NULL option is set and column is NULL -
3327  * convert it to the NULL string.
3328  */
3329  string = cstate->null_print;
3330  }
3331  else if (string != NULL && cstate->force_null_flags[m]
3332  && strcmp(string, cstate->null_print) == 0)
3333  {
3334  /*
3335  * FORCE_NULL option is set and column matches the NULL
3336  * string. It must have been quoted, or otherwise the
3337  * string would already have been set to NULL. Convert it
3338  * to NULL as specified.
3339  */
3340  string = NULL;
3341  }
3342  }
3343 
3344  cstate->cur_attname = NameStr(att->attname);
3345  cstate->cur_attval = string;
3346  values[m] = InputFunctionCall(&in_functions[m],
3347  string,
3348  typioparams[m],
3349  att->atttypmod);
3350  if (string != NULL)
3351  nulls[m] = false;
3352  cstate->cur_attname = NULL;
3353  cstate->cur_attval = NULL;
3354  }
3355 
3356  Assert(fieldno == nfields);
3357  }
3358  else
3359  {
3360  /* binary */
3361  int16 fld_count;
3362  ListCell *cur;
3363 
3364  cstate->cur_lineno++;
3365 
3366  if (!CopyGetInt16(cstate, &fld_count))
3367  {
3368  /* EOF detected (end of file, or protocol-level EOF) */
3369  return false;
3370  }
3371 
3372  if (fld_count == -1)
3373  {
3374  /*
3375  * Received EOF marker. In a V3-protocol copy, wait for the
3376  * protocol-level EOF, and complain if it doesn't come
3377  * immediately. This ensures that we correctly handle CopyFail,
3378  * if client chooses to send that now.
3379  *
3380  * Note that we MUST NOT try to read more data in an old-protocol
3381  * copy, since there is no protocol-level EOF marker then. We
3382  * could go either way for copy from file, but choose to throw
3383  * error if there's data after the EOF marker, for consistency
3384  * with the new-protocol case.
3385  */
3386  char dummy;
3387 
3388  if (cstate->copy_dest != COPY_OLD_FE &&
3389  CopyGetData(cstate, &dummy, 1, 1) > 0)
3390  ereport(ERROR,
3391  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3392  errmsg("received copy data after EOF marker")));
3393  return false;
3394  }
3395 
3396  if (fld_count != attr_count)
3397  ereport(ERROR,
3398  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3399  errmsg("row field count is %d, expected %d",
3400  (int) fld_count, attr_count)));
3401 
3402  if (file_has_oids)
3403  {
3404  Oid loaded_oid;
3405 
3406  cstate->cur_attname = "oid";
3407  loaded_oid =
3409  0,
3410  &cstate->oid_in_function,
3411  cstate->oid_typioparam,
3412  -1,
3413  &isnull));
3414  if (isnull || loaded_oid == InvalidOid)
3415  ereport(ERROR,
3416  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3417  errmsg("invalid OID in COPY data")));
3418  cstate->cur_attname = NULL;
3419  if (cstate->oids && tupleOid != NULL)
3420  *tupleOid = loaded_oid;
3421  }
3422 
3423  i = 0;
3424  foreach(cur, cstate->attnumlist)
3425  {
3426  int attnum = lfirst_int(cur);
3427  int m = attnum - 1;
3428  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3429 
3430  cstate->cur_attname = NameStr(att->attname);
3431  i++;
3432  values[m] = CopyReadBinaryAttribute(cstate,
3433  i,
3434  &in_functions[m],
3435  typioparams[m],
3436  att->atttypmod,
3437  &nulls[m]);
3438  cstate->cur_attname = NULL;
3439  }
3440  }
3441 
3442  /*
3443  * Now compute and insert any defaults available for the columns not
3444  * provided by the input data. Anything not processed here or above will
3445  * remain NULL.
3446  */
3447  for (i = 0; i < num_defaults; i++)
3448  {
3449  /*
3450  * The caller must supply econtext and have switched into the
3451  * per-tuple memory context in it.
3452  */
3453  Assert(econtext != NULL);
3455 
3456  values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3457  &nulls[defmap[i]]);
3458  }
3459 
3460  return true;
3461 }
3462 
3463 /*
3464  * Clean up storage and release resources for COPY FROM.
3465  */
3466 void
3468 {
3469  /* No COPY FROM related resources except memory. */
3470 
3471  EndCopy(cstate);
3472 }
3473 
3474 /*
3475  * Read the next input line and stash it in line_buf, with conversion to
3476  * server encoding.
3477  *
3478  * Result is true if read was terminated by EOF, false if terminated
3479  * by newline. The terminating newline or EOF marker is not included
3480  * in the final value of line_buf.
3481  */
3482 static bool
3484 {
3485  bool result;
3486 
3487  resetStringInfo(&cstate->line_buf);
3488  cstate->line_buf_valid = true;
3489 
3490  /* Mark that encoding conversion hasn't occurred yet */
3491  cstate->line_buf_converted = false;
3492 
3493  /* Parse data and transfer into line_buf */
3494  result = CopyReadLineText(cstate);
3495 
3496  if (result)
3497  {
3498  /*
3499  * Reached EOF. In protocol version 3, we should ignore anything
3500  * after \. up to the protocol end of copy data. (XXX maybe better
3501  * not to treat \. as special?)
3502  */
3503  if (cstate->copy_dest == COPY_NEW_FE)
3504  {
3505  do
3506  {
3507  cstate->raw_buf_index = cstate->raw_buf_len;
3508  } while (CopyLoadRawBuf(cstate));
3509  }
3510  }
3511  else
3512  {
3513  /*
3514  * If we didn't hit EOF, then we must have transferred the EOL marker
3515  * to line_buf along with the data. Get rid of it.
3516  */
3517  switch (cstate->eol_type)
3518  {
3519  case EOL_NL:
3520  Assert(cstate->line_buf.len >= 1);
3521  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3522  cstate->line_buf.len--;
3523  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3524  break;
3525  case EOL_CR:
3526  Assert(cstate->line_buf.len >= 1);
3527  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3528  cstate->line_buf.len--;
3529  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3530  break;
3531  case EOL_CRNL:
3532  Assert(cstate->line_buf.len >= 2);
3533  Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3534  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3535  cstate->line_buf.len -= 2;
3536  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3537  break;
3538  case EOL_UNKNOWN:
3539  /* shouldn't get here */
3540  Assert(false);
3541  break;
3542  }
3543  }
3544 
3545  /* Done reading the line. Convert it to server encoding. */
3546  if (cstate->need_transcoding)
3547  {
3548  char *cvt;
3549 
3550  cvt = pg_any_to_server(cstate->line_buf.data,
3551  cstate->line_buf.len,
3552  cstate->file_encoding);
3553  if (cvt != cstate->line_buf.data)
3554  {
3555  /* transfer converted data back to line_buf */
3556  resetStringInfo(&cstate->line_buf);
3557  appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3558  pfree(cvt);
3559  }
3560  }
3561 
3562  /* Now it's safe to use the buffer in error messages */
3563  cstate->line_buf_converted = true;
3564 
3565  return result;
3566 }
3567 
3568 /*
3569  * CopyReadLineText - inner loop of CopyReadLine for text mode
3570  */
3571 static bool
3573 {
3574  char *copy_raw_buf;
3575  int raw_buf_ptr;
3576  int copy_buf_len;
3577  bool need_data = false;
3578  bool hit_eof = false;
3579  bool result = false;
3580  char mblen_str[2];
3581 
3582  /* CSV variables */
3583  bool first_char_in_line = true;
3584  bool in_quote = false,
3585  last_was_esc = false;
3586  char quotec = '\0';
3587  char escapec = '\0';
3588 
3589  if (cstate->csv_mode)
3590  {
3591  quotec = cstate->quote[0];
3592  escapec = cstate->escape[0];
3593  /* ignore special escape processing if it's the same as quotec */
3594  if (quotec == escapec)
3595  escapec = '\0';
3596  }
3597 
3598  mblen_str[1] = '\0';
3599 
3600  /*
3601  * The objective of this loop is to transfer the entire next input line
3602  * into line_buf. Hence, we only care for detecting newlines (\r and/or
3603  * \n) and the end-of-copy marker (\.).
3604  *
3605  * In CSV mode, \r and \n inside a quoted field are just part of the data
3606  * value and are put in line_buf. We keep just enough state to know if we
3607  * are currently in a quoted field or not.
3608  *
3609  * These four characters, and the CSV escape and quote characters, are
3610  * assumed the same in frontend and backend encodings.
3611  *
3612  * For speed, we try to move data from raw_buf to line_buf in chunks
3613  * rather than one character at a time. raw_buf_ptr points to the next
3614  * character to examine; any characters from raw_buf_index to raw_buf_ptr
3615  * have been determined to be part of the line, but not yet transferred to
3616  * line_buf.
3617  *
3618  * For a little extra speed within the loop, we copy raw_buf and
3619  * raw_buf_len into local variables.
3620  */
3621  copy_raw_buf = cstate->raw_buf;
3622  raw_buf_ptr = cstate->raw_buf_index;
3623  copy_buf_len = cstate->raw_buf_len;
3624 
3625  for (;;)
3626  {
3627  int prev_raw_ptr;
3628  char c;
3629 
3630  /*
3631  * Load more data if needed. Ideally we would just force four bytes
3632  * of read-ahead and avoid the many calls to
3633  * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3634  * does not allow us to read too far ahead or we might read into the
3635  * next data, so we read-ahead only as far we know we can. One
3636  * optimization would be to read-ahead four byte here if
3637  * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3638  * considering the size of the buffer.
3639  */
3640  if (raw_buf_ptr >= copy_buf_len || need_data)
3641  {
3643 
3644  /*
3645  * Try to read some more data. This will certainly reset
3646  * raw_buf_index to zero, and raw_buf_ptr must go with it.
3647  */
3648  if (!CopyLoadRawBuf(cstate))
3649  hit_eof = true;
3650  raw_buf_ptr = 0;
3651  copy_buf_len = cstate->raw_buf_len;
3652 
3653  /*
3654  * If we are completely out of data, break out of the loop,
3655  * reporting EOF.
3656  */
3657  if (copy_buf_len <= 0)
3658  {
3659  result = true;
3660  break;
3661  }
3662  need_data = false;
3663  }
3664 
3665  /* OK to fetch a character */
3666  prev_raw_ptr = raw_buf_ptr;
3667  c = copy_raw_buf[raw_buf_ptr++];
3668 
3669  if (cstate->csv_mode)
3670  {
3671  /*
3672  * If character is '\\' or '\r', we may need to look ahead below.
3673  * Force fetch of the next character if we don't already have it.
3674  * We need to do this before changing CSV state, in case one of
3675  * these characters is also the quote or escape character.
3676  *
3677  * Note: old-protocol does not like forced prefetch, but it's OK
3678  * here since we cannot validly be at EOF.
3679  */
3680  if (c == '\\' || c == '\r')
3681  {
3683  }
3684 
3685  /*
3686  * Dealing with quotes and escapes here is mildly tricky. If the
3687  * quote char is also the escape char, there's no problem - we
3688  * just use the char as a toggle. If they are different, we need
3689  * to ensure that we only take account of an escape inside a
3690  * quoted field and immediately preceding a quote char, and not
3691  * the second in an escape-escape sequence.
3692  */
3693  if (in_quote && c == escapec)
3694  last_was_esc = !last_was_esc;
3695  if (c == quotec && !last_was_esc)
3696  in_quote = !in_quote;
3697  if (c != escapec)
3698  last_was_esc = false;
3699 
3700  /*
3701  * Updating the line count for embedded CR and/or LF chars is
3702  * necessarily a little fragile - this test is probably about the
3703  * best we can do. (XXX it's arguable whether we should do this
3704  * at all --- is cur_lineno a physical or logical count?)
3705  */
3706  if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3707  cstate->cur_lineno++;
3708  }
3709 
3710  /* Process \r */
3711  if (c == '\r' && (!cstate->csv_mode || !in_quote))
3712  {
3713  /* Check for \r\n on first line, _and_ handle \r\n. */
3714  if (cstate->eol_type == EOL_UNKNOWN ||
3715  cstate->eol_type == EOL_CRNL)
3716  {
3717  /*
3718  * If need more data, go back to loop top to load it.
3719  *
3720  * Note that if we are at EOF, c will wind up as '\0' because
3721  * of the guaranteed pad of raw_buf.
3722  */
3724 
3725  /* get next char */
3726  c = copy_raw_buf[raw_buf_ptr];
3727 
3728  if (c == '\n')
3729  {
3730  raw_buf_ptr++; /* eat newline */
3731  cstate->eol_type = EOL_CRNL; /* in case not set yet */
3732  }
3733  else
3734  {
3735  /* found \r, but no \n */
3736  if (cstate->eol_type == EOL_CRNL)
3737  ereport(ERROR,
3738  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3739  !cstate->csv_mode ?
3740  errmsg("literal carriage return found in data") :
3741  errmsg("unquoted carriage return found in data"),
3742  !cstate->csv_mode ?
3743  errhint("Use \"\\r\" to represent carriage return.") :
3744  errhint("Use quoted CSV field to represent carriage return.")));
3745 
3746  /*
3747  * if we got here, it is the first line and we didn't find
3748  * \n, so don't consume the peeked character
3749  */
3750  cstate->eol_type = EOL_CR;
3751  }
3752  }
3753  else if (cstate->eol_type == EOL_NL)
3754  ereport(ERROR,
3755  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3756  !cstate->csv_mode ?
3757  errmsg("literal carriage return found in data") :
3758  errmsg("unquoted carriage return found in data"),
3759  !cstate->csv_mode ?
3760  errhint("Use \"\\r\" to represent carriage return.") :
3761  errhint("Use quoted CSV field to represent carriage return.")));
3762  /* If reach here, we have found the line terminator */
3763  break;
3764  }
3765 
3766  /* Process \n */
3767  if (c == '\n' && (!cstate->csv_mode || !in_quote))
3768  {
3769  if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3770  ereport(ERROR,
3771  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3772  !cstate->csv_mode ?
3773  errmsg("literal newline found in data") :
3774  errmsg("unquoted newline found in data"),
3775  !cstate->csv_mode ?
3776  errhint("Use \"\\n\" to represent newline.") :
3777  errhint("Use quoted CSV field to represent newline.")));
3778  cstate->eol_type = EOL_NL; /* in case not set yet */
3779  /* If reach here, we have found the line terminator */
3780  break;
3781  }
3782 
3783  /*
3784  * In CSV mode, we only recognize \. alone on a line. This is because
3785  * \. is a valid CSV data value.
3786  */
3787  if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3788  {
3789  char c2;
3790 
3793 
3794  /* -----
3795  * get next character
3796  * Note: we do not change c so if it isn't \., we can fall
3797  * through and continue processing for file encoding.
3798  * -----
3799  */
3800  c2 = copy_raw_buf[raw_buf_ptr];
3801 
3802  if (c2 == '.')
3803  {
3804  raw_buf_ptr++; /* consume the '.' */
3805 
3806  /*
3807  * Note: if we loop back for more data here, it does not
3808  * matter that the CSV state change checks are re-executed; we
3809  * will come back here with no important state changed.
3810  */
3811  if (cstate->eol_type == EOL_CRNL)
3812  {
3813  /* Get the next character */
3815  /* if hit_eof, c2 will become '\0' */
3816  c2 = copy_raw_buf[raw_buf_ptr++];
3817 
3818  if (c2 == '\n')
3819  {
3820  if (!cstate->csv_mode)
3821  ereport(ERROR,
3822  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3823  errmsg("end-of-copy marker does not match previous newline style")));
3824  else
3826  }
3827  else if (c2 != '\r')
3828  {
3829  if (!cstate->csv_mode)
3830  ereport(ERROR,
3831  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3832  errmsg("end-of-copy marker corrupt")));
3833  else
3835  }
3836  }
3837 
3838  /* Get the next character */
3840  /* if hit_eof, c2 will become '\0' */
3841  c2 = copy_raw_buf[raw_buf_ptr++];
3842 
3843  if (c2 != '\r' && c2 != '\n')
3844  {
3845  if (!cstate->csv_mode)
3846  ereport(ERROR,
3847  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3848  errmsg("end-of-copy marker corrupt")));
3849  else
3851  }
3852 
3853  if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3854  (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3855  (cstate->eol_type == EOL_CR && c2 != '\r'))
3856  {
3857  ereport(ERROR,
3858  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3859  errmsg("end-of-copy marker does not match previous newline style")));
3860  }
3861 
3862  /*
3863  * Transfer only the data before the \. into line_buf, then
3864  * discard the data and the \. sequence.
3865  */
3866  if (prev_raw_ptr > cstate->raw_buf_index)
3868  cstate->raw_buf + cstate->raw_buf_index,
3869  prev_raw_ptr - cstate->raw_buf_index);
3870  cstate->raw_buf_index = raw_buf_ptr;
3871  result = true; /* report EOF */
3872  break;
3873  }
3874  else if (!cstate->csv_mode)
3875 
3876  /*
3877  * If we are here, it means we found a backslash followed by
3878  * something other than a period. In non-CSV mode, anything
3879  * after a backslash is special, so we skip over that second
3880  * character too. If we didn't do that \\. would be
3881  * considered an eof-of copy, while in non-CSV mode it is a
3882  * literal backslash followed by a period. In CSV mode,
3883  * backslashes are not special, so we want to process the
3884  * character after the backslash just like a normal character,
3885  * so we don't increment in those cases.
3886  */
3887  raw_buf_ptr++;
3888  }
3889 
3890  /*
3891  * This label is for CSV cases where \. appears at the start of a
3892  * line, but there is more text after it, meaning it was a data value.
3893  * We are more strict for \. in CSV mode because \. could be a data
3894  * value, while in non-CSV mode, \. cannot be a data value.
3895  */
3896 not_end_of_copy:
3897 
3898  /*
3899  * Process all bytes of a multi-byte character as a group.
3900  *
3901  * We only support multi-byte sequences where the first byte has the
3902  * high-bit set, so as an optimization we can avoid this block
3903  * entirely if it is not set.
3904  */
3905  if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3906  {
3907  int mblen;
3908 
3909  mblen_str[0] = c;
3910  /* All our encodings only read the first byte to get the length */
3911  mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3913  IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3914  raw_buf_ptr += mblen - 1;
3915  }
3916  first_char_in_line = false;
3917  } /* end of outer loop */
3918 
3919  /*
3920  * Transfer any still-uncopied data to line_buf.
3921  */
3923 
3924  return result;
3925 }
3926 
3927 /*
3928  * Return decimal value for a hexadecimal digit
3929  */
3930 static int
3932 {
3933  if (isdigit((unsigned char) hex))
3934  return hex - '0';
3935  else
3936  return tolower((unsigned char) hex) - 'a' + 10;
3937 }
3938 
3939 /*
3940  * Parse the current line into separate attributes (fields),
3941  * performing de-escaping as needed.
3942  *
3943  * The input is in line_buf. We use attribute_buf to hold the result
3944  * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3945  * string, or NULL when the input matches the null marker string.
3946  * This array is expanded as necessary.
3947  *
3948  * (Note that the caller cannot check for nulls since the returned
3949  * string would be the post-de-escaping equivalent, which may look
3950  * the same as some valid data string.)
3951  *
3952  * delim is the column delimiter string (must be just one byte for now).
3953  * null_print is the null marker string. Note that this is compared to
3954  * the pre-de-escaped input string.
3955  *
3956  * The return value is the number of fields actually read.
3957  */
3958 static int
3960 {
3961  char delimc = cstate->delim[0];
3962  int fieldno;
3963  char *output_ptr;
3964  char *cur_ptr;
3965  char *line_end_ptr;
3966 
3967  /*
3968  * We need a special case for zero-column tables: check that the input
3969  * line is empty, and return.
3970  */
3971  if (cstate->max_fields <= 0)
3972  {
3973  if (cstate->line_buf.len != 0)
3974  ereport(ERROR,
3975  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3976  errmsg("extra data after last expected column")));
3977  return 0;
3978  }
3979 
3980  resetStringInfo(&cstate->attribute_buf);
3981 
3982  /*
3983  * The de-escaped attributes will certainly not be longer than the input
3984  * data line, so we can just force attribute_buf to be large enough and
3985  * then transfer data without any checks for enough space. We need to do
3986  * it this way because enlarging attribute_buf mid-stream would invalidate
3987  * pointers already stored into cstate->raw_fields[].
3988  */
3989  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3990  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3991  output_ptr = cstate->attribute_buf.data;
3992 
3993  /* set pointer variables for loop */
3994  cur_ptr = cstate->line_buf.data;
3995  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3996 
3997  /* Outer loop iterates over fields */
3998  fieldno = 0;
3999  for (;;)
4000  {
4001  bool found_delim = false;
4002  char *start_ptr;
4003  char *end_ptr;
4004  int input_len;
4005  bool saw_non_ascii = false;
4006 
4007  /* Make sure there is enough space for the next value */
4008  if (fieldno >= cstate->max_fields)
4009  {
4010  cstate->max_fields *= 2;
4011  cstate->raw_fields =
4012  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4013  }
4014 
4015  /* Remember start of field on both input and output sides */
4016  start_ptr = cur_ptr;
4017  cstate->raw_fields[fieldno] = output_ptr;
4018 
4019  /*
4020  * Scan data for field.
4021  *
4022  * Note that in this loop, we are scanning to locate the end of field
4023  * and also speculatively performing de-escaping. Once we find the
4024  * end-of-field, we can match the raw field contents against the null
4025  * marker string. Only after that comparison fails do we know that
4026  * de-escaping is actually the right thing to do; therefore we *must
4027  * not* throw any syntax errors before we've done the null-marker
4028  * check.
4029  */
4030  for (;;)
4031  {
4032  char c;
4033 
4034  end_ptr = cur_ptr;
4035  if (cur_ptr >= line_end_ptr)
4036  break;
4037  c = *cur_ptr++;
4038  if (c == delimc)
4039  {
4040  found_delim = true;
4041  break;
4042  }
4043  if (c == '\\')
4044  {
4045  if (cur_ptr >= line_end_ptr)
4046  break;
4047  c = *cur_ptr++;
4048  switch (c)
4049  {
4050  case '0':
4051  case '1':
4052  case '2':
4053  case '3':
4054  case '4':
4055  case '5':
4056  case '6':
4057  case '7':
4058  {
4059  /* handle \013 */
4060  int val;
4061 
4062  val = OCTVALUE(c);
4063  if (cur_ptr < line_end_ptr)
4064  {
4065  c = *cur_ptr;
4066  if (ISOCTAL(c))
4067  {
4068  cur_ptr++;
4069  val = (val << 3) + OCTVALUE(c);
4070  if (cur_ptr < line_end_ptr)
4071  {
4072  c = *cur_ptr;
4073  if (ISOCTAL(c))
4074  {
4075  cur_ptr++;
4076  val = (val << 3) + OCTVALUE(c);
4077  }
4078  }
4079  }
4080  }
4081  c = val & 0377;
4082  if (c == '\0' || IS_HIGHBIT_SET(c))
4083  saw_non_ascii = true;
4084  }
4085  break;
4086  case 'x':
4087  /* Handle \x3F */
4088  if (cur_ptr < line_end_ptr)
4089  {
4090  char hexchar = *cur_ptr;
4091 
4092  if (isxdigit((unsigned char) hexchar))
4093  {
4094  int val = GetDecimalFromHex(hexchar);
4095 
4096  cur_ptr++;
4097  if (cur_ptr < line_end_ptr)
4098  {
4099  hexchar = *cur_ptr;
4100  if (isxdigit((unsigned char) hexchar))
4101  {
4102  cur_ptr++;
4103  val = (val << 4) + GetDecimalFromHex(hexchar);
4104  }
4105  }
4106  c = val & 0xff;
4107  if (c == '\0' || IS_HIGHBIT_SET(c))
4108  saw_non_ascii = true;
4109  }
4110  }
4111  break;
4112  case 'b':
4113  c = '\b';
4114  break;
4115  case 'f':
4116  c = '\f';
4117  break;
4118  case 'n':
4119  c = '\n';
4120  break;
4121  case 'r':
4122  c = '\r';
4123  break;
4124  case 't':
4125  c = '\t';
4126  break;
4127  case 'v':
4128  c = '\v';
4129  break;
4130 
4131  /*
4132  * in all other cases, take the char after '\'
4133  * literally
4134  */
4135  }
4136  }
4137 
4138  /* Add c to output string */
4139  *output_ptr++ = c;
4140  }
4141 
4142  /* Check whether raw input matched null marker */
4143  input_len = end_ptr - start_ptr;
4144  if (input_len == cstate->null_print_len &&
4145  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4146  cstate->raw_fields[fieldno] = NULL;
4147  else
4148  {
4149  /*
4150  * At this point we know the field is supposed to contain data.
4151  *
4152  * If we de-escaped any non-7-bit-ASCII chars, make sure the
4153  * resulting string is valid data for the db encoding.
4154  */
4155  if (saw_non_ascii)
4156  {
4157  char *fld = cstate->raw_fields[fieldno];
4158 
4159  pg_verifymbstr(fld, output_ptr - fld, false);
4160  }
4161  }
4162 
4163  /* Terminate attribute value in output area */
4164  *output_ptr++ = '\0';
4165 
4166  fieldno++;
4167  /* Done if we hit EOL instead of a delim */
4168  if (!found_delim)
4169  break;
4170  }
4171 
4172  /* Clean up state of attribute_buf */
4173  output_ptr--;
4174  Assert(*output_ptr == '\0');
4175  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4176 
4177  return fieldno;
4178 }
4179 
4180 /*
4181  * Parse the current line into separate attributes (fields),
4182  * performing de-escaping as needed. This has exactly the same API as
4183  * CopyReadAttributesText, except we parse the fields according to
4184  * "standard" (i.e. common) CSV usage.
4185  */
4186 static int
4188 {
4189  char delimc = cstate->delim[0];
4190  char quotec = cstate->quote[0];
4191  char escapec = cstate->escape[0];
4192  int fieldno;
4193  char *output_ptr;
4194  char *cur_ptr;
4195  char *line_end_ptr;
4196 
4197  /*
4198  * We need a special case for zero-column tables: check that the input
4199  * line is empty, and return.
4200  */
4201  if (cstate->max_fields <= 0)
4202  {
4203  if (cstate->line_buf.len != 0)
4204  ereport(ERROR,
4205  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4206  errmsg("extra data after last expected column")));
4207  return 0;
4208  }
4209 
4210  resetStringInfo(&cstate->attribute_buf);
4211 
4212  /*
4213  * The de-escaped attributes will certainly not be longer than the input
4214  * data line, so we can just force attribute_buf to be large enough and
4215  * then transfer data without any checks for enough space. We need to do
4216  * it this way because enlarging attribute_buf mid-stream would invalidate
4217  * pointers already stored into cstate->raw_fields[].
4218  */
4219  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4220  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4221  output_ptr = cstate->attribute_buf.data;
4222 
4223  /* set pointer variables for loop */
4224  cur_ptr = cstate->line_buf.data;
4225  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4226 
4227  /* Outer loop iterates over fields */
4228  fieldno = 0;
4229  for (;;)
4230  {
4231  bool found_delim = false;
4232  bool saw_quote = false;
4233  char *start_ptr;
4234  char *end_ptr;
4235  int input_len;
4236 
4237  /* Make sure there is enough space for the next value */
4238  if (fieldno >= cstate->max_fields)
4239  {
4240  cstate->max_fields *= 2;
4241  cstate->raw_fields =
4242  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4243  }
4244 
4245  /* Remember start of field on both input and output sides */
4246  start_ptr = cur_ptr;
4247  cstate->raw_fields[fieldno] = output_ptr;
4248 
4249  /*
4250  * Scan data for field,
4251  *
4252  * The loop starts in "not quote" mode and then toggles between that
4253  * and "in quote" mode. The loop exits normally if it is in "not
4254  * quote" mode and a delimiter or line end is seen.
4255  */
4256  for (;;)
4257  {
4258  char c;
4259 
4260  /* Not in quote */
4261  for (;;)
4262  {
4263  end_ptr = cur_ptr;
4264  if (cur_ptr >= line_end_ptr)
4265  goto endfield;
4266  c = *cur_ptr++;
4267  /* unquoted field delimiter */
4268  if (c == delimc)
4269  {
4270  found_delim = true;
4271  goto endfield;
4272  }
4273  /* start of quoted field (or part of field) */
4274  if (c == quotec)
4275  {
4276  saw_quote = true;
4277  break;
4278  }
4279  /* Add c to output string */
4280  *output_ptr++ = c;
4281  }
4282 
4283  /* In quote */
4284  for (;;)
4285  {
4286  end_ptr = cur_ptr;
4287  if (cur_ptr >= line_end_ptr)
4288  ereport(ERROR,
4289  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4290  errmsg("unterminated CSV quoted field")));
4291 
4292  c = *cur_ptr++;
4293 
4294  /* escape within a quoted field */
4295  if (c == escapec)
4296  {
4297  /*
4298  * peek at the next char if available, and escape it if it
4299  * is an escape char or a quote char
4300  */
4301  if (cur_ptr < line_end_ptr)
4302  {
4303  char nextc = *cur_ptr;
4304 
4305  if (nextc == escapec || nextc == quotec)
4306  {
4307  *output_ptr++ = nextc;
4308  cur_ptr++;
4309  continue;
4310  }
4311  }
4312  }
4313 
4314  /*
4315  * end of quoted field. Must do this test after testing for
4316  * escape in case quote char and escape char are the same
4317  * (which is the common case).
4318  */
4319  if (c == quotec)
4320  break;
4321 
4322  /* Add c to output string */
4323  *output_ptr++ = c;
4324  }
4325  }
4326 endfield:
4327 
4328  /* Terminate attribute value in output area */
4329  *output_ptr++ = '\0';
4330 
4331  /* Check whether raw input matched null marker */
4332  input_len = end_ptr - start_ptr;
4333  if (!saw_quote && input_len == cstate->null_print_len &&
4334  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4335  cstate->raw_fields[fieldno] = NULL;
4336 
4337  fieldno++;
4338  /* Done if we hit EOL instead of a delim */
4339  if (!found_delim)
4340  break;
4341  }
4342 
4343  /* Clean up state of attribute_buf */
4344  output_ptr--;
4345  Assert(*output_ptr == '\0');
4346  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4347 
4348  return fieldno;
4349 }
4350 
4351 
4352 /*
4353  * Read a binary attribute
4354  */
4355 static Datum
4357  int column_no, FmgrInfo *flinfo,
4358  Oid typioparam, int32 typmod,
4359  bool *isnull)
4360 {
4361  int32 fld_size;
4362  Datum result;
4363 
4364  if (!CopyGetInt32(cstate, &fld_size))
4365  ereport(ERROR,
4366  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4367  errmsg("unexpected EOF in COPY data")));
4368  if (fld_size == -1)
4369  {
4370  *isnull = true;
4371  return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4372  }
4373  if (fld_size < 0)
4374  ereport(ERROR,
4375  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4376  errmsg("invalid field size")));
4377 
4378  /* reset attribute_buf to empty, and load raw data in it */
4379  resetStringInfo(&cstate->attribute_buf);
4380 
4381  enlargeStringInfo(&cstate->attribute_buf, fld_size);
4382  if (CopyGetData(cstate, cstate->attribute_buf.data,
4383  fld_size, fld_size) != fld_size)
4384  ereport(ERROR,
4385  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4386  errmsg("unexpected EOF in COPY data")));
4387 
4388  cstate->attribute_buf.len = fld_size;
4389  cstate->attribute_buf.data[fld_size] = '\0';
4390 
4391  /* Call the column type's binary input converter */
4392  result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4393  typioparam, typmod);
4394 
4395  /* Trouble if it didn't eat the whole buffer */
4396  if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4397  ereport(ERROR,
4398  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4399  errmsg("incorrect binary data format")));
4400 
4401  *isnull = false;
4402  return result;
4403 }
4404 
4405 /*
4406  * Send text representation of one attribute, with conversion and escaping
4407  */
4408 #define DUMPSOFAR() \
4409  do { \
4410  if (ptr > start) \
4411  CopySendData(cstate, start, ptr - start); \
4412  } while (0)
4413 
4414 static void
4415 CopyAttributeOutText(CopyState cstate, char *string)
4416 {
4417  char *ptr;
4418  char *start;
4419  char c;
4420  char delimc = cstate->delim[0];
4421 
4422  if (cstate->need_transcoding)
4423  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4424  else
4425  ptr = string;
4426 
4427  /*
4428  * We have to grovel through the string searching for control characters
4429  * and instances of the delimiter character. In most cases, though, these
4430  * are infrequent. To avoid overhead from calling CopySendData once per
4431  * character, we dump out all characters between escaped characters in a
4432  * single call. The loop invariant is that the data from "start" to "ptr"
4433  * can be sent literally, but hasn't yet been.
4434  *
4435  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4436  * in valid backend encodings, extra bytes of a multibyte character never
4437  * look like ASCII. This loop is sufficiently performance-critical that
4438  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4439  * of the normal safe-encoding path.
4440  */
4441  if (cstate->encoding_embeds_ascii)
4442  {
4443  start = ptr;
4444  while ((c = *ptr) != '\0')
4445  {
4446  if ((unsigned char) c < (unsigned char) 0x20)
4447  {
4448  /*
4449  * \r and \n must be escaped, the others are traditional. We
4450  * prefer to dump these using the C-like notation, rather than
4451  * a backslash and the literal character, because it makes the
4452  * dump file a bit more proof against Microsoftish data
4453  * mangling.
4454  */
4455  switch (c)
4456  {
4457  case '\b':
4458  c = 'b';
4459  break;
4460  case '\f':
4461  c = 'f';
4462  break;
4463  case '\n':
4464  c = 'n';
4465  break;
4466  case '\r':
4467  c = 'r';
4468  break;
4469  case '\t':
4470  c = 't';
4471  break;
4472  case '\v':
4473  c = 'v';
4474  break;
4475  default:
4476  /* If it's the delimiter, must backslash it */
4477  if (c == delimc)
4478  break;
4479  /* All ASCII control chars are length 1 */
4480  ptr++;
4481  continue; /* fall to end of loop */
4482  }
4483  /* if we get here, we need to convert the control char */
4484  DUMPSOFAR();
4485  CopySendChar(cstate, '\\');
4486  CopySendChar(cstate, c);
4487  start = ++ptr; /* do not include char in next run */
4488  }
4489  else if (c == '\\' || c == delimc)
4490  {
4491  DUMPSOFAR();
4492  CopySendChar(cstate, '\\');
4493  start = ptr++; /* we include char in next run */
4494  }
4495  else if (IS_HIGHBIT_SET(c))
4496  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4497  else
4498  ptr++;
4499  }
4500  }
4501  else
4502  {
4503  start = ptr;
4504  while ((c = *ptr) != '\0')
4505  {
4506  if ((unsigned char) c < (unsigned char) 0x20)
4507  {
4508  /*
4509  * \r and \n must be escaped, the others are traditional. We
4510  * prefer to dump these using the C-like notation, rather than
4511  * a backslash and the literal character, because it makes the
4512  * dump file a bit more proof against Microsoftish data
4513  * mangling.
4514  */
4515  switch (c)
4516  {
4517  case '\b':
4518  c = 'b';
4519  break;
4520  case '\f':
4521  c = 'f';
4522  break;
4523  case '\n':
4524  c = 'n';
4525  break;
4526  case '\r':
4527  c = 'r';
4528  break;
4529  case '\t':
4530  c = 't';
4531  break;
4532  case '\v':
4533  c = 'v';
4534  break;
4535  default:
4536  /* If it's the delimiter, must backslash it */
4537  if (c == delimc)
4538  break;
4539  /* All ASCII control chars are length 1 */
4540  ptr++;
4541  continue; /* fall to end of loop */
4542  }
4543  /* if we get here, we need to convert the control char */
4544  DUMPSOFAR();
4545  CopySendChar(cstate, '\\');
4546  CopySendChar(cstate, c);
4547  start = ++ptr; /* do not include char in next run */
4548  }
4549  else if (c == '\\' || c == delimc)
4550  {
4551  DUMPSOFAR();
4552  CopySendChar(cstate, '\\');
4553  start = ptr++; /* we include char in next run */
4554  }
4555  else
4556  ptr++;
4557  }
4558  }
4559 
4560  DUMPSOFAR();
4561 }
4562 
4563 /*
4564  * Send text representation of one attribute, with conversion and
4565  * CSV-style escaping
4566  */
4567 static void
4568 CopyAttributeOutCSV(CopyState cstate, char *string,
4569  bool use_quote, bool single_attr)
4570 {
4571  char *ptr;
4572  char *start;
4573  char c;
4574  char delimc = cstate->delim[0];
4575  char quotec = cstate->quote[0];
4576  char escapec = cstate->escape[0];
4577 
4578  /* force quoting if it matches null_print (before conversion!) */
4579  if (!use_quote && strcmp(string, cstate->null_print) == 0)
4580  use_quote = true;
4581 
4582  if (cstate->need_transcoding)
4583  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4584  else
4585  ptr = string;
4586 
4587  /*
4588  * Make a preliminary pass to discover if it needs quoting
4589  */
4590  if (!use_quote)
4591  {
4592  /*
4593  * Because '\.' can be a data value, quote it if it appears alone on a
4594  * line so it is not interpreted as the end-of-data marker.
4595  */
4596  if (single_attr && strcmp(ptr, "\\.") == 0)
4597  use_quote = true;
4598  else
4599  {
4600  char *tptr = ptr;
4601 
4602  while ((c = *tptr) != '\0')
4603  {
4604  if (c == delimc || c == quotec || c == '\n' || c == '\r')
4605  {
4606  use_quote = true;
4607  break;
4608  }
4609  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4610  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4611  else
4612  tptr++;
4613  }
4614  }
4615  }
4616 
4617  if (use_quote)
4618  {
4619  CopySendChar(cstate, quotec);
4620 
4621  /*
4622  * We adopt the same optimization strategy as in CopyAttributeOutText
4623  */
4624  start = ptr;
4625  while ((c = *ptr) != '\0')
4626  {
4627  if (c == quotec || c == escapec)
4628  {
4629  DUMPSOFAR();
4630  CopySendChar(cstate, escapec);
4631  start = ptr; /* we include char in next run */
4632  }
4633  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4634  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4635  else
4636  ptr++;
4637  }
4638  DUMPSOFAR();
4639 
4640  CopySendChar(cstate, quotec);
4641  }
4642  else
4643  {
4644  /* If it doesn't need quoting, we can just dump it as-is */
4645  CopySendString(cstate, ptr);
4646  }
4647 }
4648 
4649 /*
4650  * CopyGetAttnums - build an integer list of attnums to be copied
4651  *
4652  * The input attnamelist is either the user-specified column list,
4653  * or NIL if there was none (in which case we want all the non-dropped
4654  * columns).
4655  *
4656  * rel can be NULL ... it's only used for error reports.
4657  */
4658 static List *
4659 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4660 {
4661  List *attnums = NIL;
4662 
4663  if (attnamelist == NIL)
4664  {
4665  /* Generate default column list */
4666  int attr_count = tupDesc->natts;
4667  int i;
4668 
4669  for (i = 0; i < attr_count; i++)
4670  {
4671  if (TupleDescAttr(tupDesc, i)->attisdropped)
4672  continue;
4673  attnums = lappend_int(attnums, i + 1);
4674  }
4675  }
4676  else
4677  {
4678  /* Validate the user-supplied list and extract attnums */
4679  ListCell *l;
4680 
4681  foreach(l, attnamelist)
4682  {
4683  char *name = strVal(lfirst(l));
4684  int attnum;
4685  int i;
4686 
4687  /* Lookup column name */
4688  attnum = InvalidAttrNumber;
4689  for (i = 0; i < tupDesc->natts; i++)
4690  {
4691  Form_pg_attribute att = TupleDescAttr(tupDesc, i);
4692 
4693  if (att->attisdropped)
4694  continue;
4695  if (namestrcmp(&(att->attname), name) == 0)
4696  {
4697  attnum = att->attnum;
4698  break;
4699  }
4700  }
4701  if (attnum == InvalidAttrNumber)
4702  {
4703  if (rel != NULL)
4704  ereport(ERROR,
4705  (errcode(ERRCODE_UNDEFINED_COLUMN),
4706  errmsg("column \"%s\" of relation \"%s\" does not exist",
4707  name, RelationGetRelationName(rel))));
4708  else
4709  ereport(ERROR,
4710  (errcode(ERRCODE_UNDEFINED_COLUMN),
4711  errmsg("column \"%s\" does not exist",
4712  name)));
4713  }
4714  /* Check for duplicates */
4715  if (list_member_int(attnums, attnum))
4716  ereport(ERROR,
4717  (errcode(ERRCODE_DUPLICATE_COLUMN),
4718  errmsg("column \"%s\" specified more than once",
4719  name)));
4720  attnums = lappend_int(attnums, attnum);
4721  }
4722  }
4723 
4724  return attnums;
4725 }
4726 
4727 
4728 /*
4729  * copy_dest_startup --- executor startup
4730  */
4731 static void
4732 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4733 {
4734  /* no-op */
4735 }
4736 
4737 /*
4738  * copy_dest_receive --- receive one tuple
4739  */
4740 static bool
4742 {
4743  DR_copy *myState = (DR_copy *) self;
4744  CopyState cstate = myState->cstate;
4745 
4746  /* Make sure the tuple is fully deconstructed */
4747  slot_getallattrs(slot);
4748 
4749  /* And send the data */
4750  CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4751  myState->processed++;
4752 
4753  return true;
4754 }
4755 
4756 /*
4757  * copy_dest_shutdown --- executor end
4758  */
4759 static void
4761 {
4762  /* no-op */
4763 }
4764 
4765 /*
4766  * copy_dest_destroy --- release DestReceiver object
4767  */
4768 static void
4770 {
4771  pfree(self);
4772 }
4773 
4774 /*
4775  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4776  */
4777 DestReceiver *
4779 {
4780  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4781 
4782  self->pub.receiveSlot = copy_dest_receive;
4783  self->pub.rStartup = copy_dest_startup;
4784  self->pub.rShutdown = copy_dest_shutdown;
4785  self->pub.rDestroy = copy_dest_destroy;
4786  self->pub.mydest = DestCopyOut;
4787 
4788  self->cstate = NULL; /* will be set later */
4789  self->processed = 0;
4790 
4791  return (DestReceiver *) self;
4792 }
signed short int16
Definition: c.h:301
List * indirection
Definition: parsenodes.h:440
bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
Definition: copy.c:3174
int ri_NumIndices
Definition: execnodes.h:371
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:477
int ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd, TupleTableSlot *slot, EState *estate)
static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
Definition: copy.c:4356
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
static int GetDecimalFromHex(char hex)
Definition: copy.c:3931
Definition: fmgr.h:56
#define MAX_COPY_DATA_DISPLAY
bool csv_mode
Definition: copy.c:118
static void SendCopyEnd(CopyState cstate)
Definition: copy.c:414
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1306
Relation ri_RelationDesc
Definition: execnodes.h:368
List * range_table
Definition: copy.c:166
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options)
Definition: copy.c:1370
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:781
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
static bool CopyReadLineText(CopyState cstate)
Definition: copy.c:3572
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:123
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:1010
Node * val
Definition: parsenodes.h:441
static bool CopyGetInt32(CopyState cstate, int32 *val)
Definition: copy.c:680
int errhint(const char *fmt,...)
Definition: elog.c:987
int pg_char_to_encoding(const char *name)
Definition: encnames.c:551
char ** raw_fields
Definition: copy.c:186
Definition: copy.c:74
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2650
#define VARDATA(PTR)
Definition: postgres.h:302
static void EndCopy(CopyState cstate)
Definition: copy.c:1712
bool binary
Definition: copy.c:115
static struct @130 value
#define pq_flush()
Definition: libpq.h:39
void CopyFromErrorCallback(void *arg)
Definition: copy.c:2173
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1568
TupleConversionMap * TupConvMapForLeaf(PartitionTupleRouting *proute, ResultRelInfo *rootRelInfo, int leaf_index)
void PreventCommandIfParallelMode(const char *cmdname)
Definition: utility.c:255
List * ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:271
List * attlist
Definition: parsenodes.h:1953
List * fromClause
Definition: parsenodes.h:1537
#define ISOCTAL(c)
Definition: copy.c:53
#define OCTVALUE(c)
Definition: copy.c:54
#define ResetPerTupleExprContext(estate)
Definition: executor.h:499
#define RelationGetDescr(relation)
Definition: rel.h:437
#define HEAP_INSERT_FROZEN
Definition: heapam.h:30
char * name
Definition: parsenodes.h:439
bool need_transcoding
Definition: copy.c:105
#define castNode(_type_, nodeptr)
Definition: nodes.h:582
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
FmgrInfo * in_functions
Definition: copy.c:161
AttrNumber num_defaults
Definition: copy.c:157
List * attnumlist
Definition: copy.c:111
#define OIDOID
Definition: pg_type.h:328
#define VARSIZE(PTR)
Definition: postgres.h:303
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * filename
Definition: copy.c:112
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copy.c:2915
#define VARHDRSZ
Definition: c.h:511
bool file_has_oids
Definition: copy.c:158
#define DatumGetObjectId(X)
Definition: postgres.h:483
List * relationOids
Definition: plannodes.h:88
char * pstrdup(const char *in)
Definition: mcxt.c:1063
#define pg_hton16(x)
Definition: pg_bswap.h:120
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:217
static void ReceiveCopyBegin(CopyState cstate)
Definition: copy.c:378
#define XLogIsNeeded()
Definition: xlog.h:146
#define pg_ntoh16(x)
Definition: pg_bswap.h:124
Definition: copy.c:213
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
#define MAX_BUFFERED_TUPLES
bool rd_islocaltemp
Definition: rel.h:90
TupleTableSlot * ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2369
#define S_IWOTH
Definition: win32_port.h:298
Expr * expression_planner(Expr *expr)
Definition: planner.c:5667
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:140
DestReceiver pub
Definition: copy.c:215
void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options)
Definition: copy.c:1012
#define RELKIND_MATVIEW
Definition: pg_class.h:165
StringInfoData line_buf
Definition: copy.c:195
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen)
Definition: copy.c:249
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:839
#define AccessShareLock
Definition: lockdefs.h:36
#define InvalidBuffer
Definition: buf.h:25
struct CopyStateData CopyStateData
Definition: nodes.h:513
PartitionTupleRouting * ExecSetupPartitionTupleRouting(ModifyTableState *mtstate, Relation rel, Index resultRTindex, EState *estate)
Definition: execPartition.c:49
#define strVal(v)
Definition: value.h:54
struct cursor * cur
Definition: ecpg.c:28
int raw_buf_index
Definition: copy.c:208
static void CopyAttributeOutText(CopyState cstate, char *string)
Definition: copy.c:4415
bool line_buf_valid
Definition: copy.c:197
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1655
CopyState cstate
Definition: copy.c:216
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_BINARY_W
Definition: c.h:1072
bool superuser(void)
Definition: superuser.c:47
int namestrcmp(Name name, const char *str)
Definition: name.c:247
#define MemSet(start, val, len)
Definition: c.h:897
bool fe_eof
Definition: copy.c:102
uint64 CopyFrom(CopyState cstate)
Definition: copy.c:2278
void ExecSetupChildParentMapForLeaf(PartitionTupleRouting *proute)
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:111
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, HeapTuple trigtuple, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2354
void heap_sync(Relation rel)
Definition: heapam.c:9232
Datum * tts_values
Definition: tuptable.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
static void ClosePipeToProgram(CopyState cstate)
Definition: copy.c:1689
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:134
static int CopyReadAttributesCSV(CopyState cstate)
Definition: copy.c:4187
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
AclMode requiredPerms
Definition: parsenodes.h:1067
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
TupleTableSlot * partition_tuple_slot
bool * force_quote_flags
Definition: copy.c:128
List * es_range_table
Definition: execnodes.h:445
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
List * pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)
Definition: postgres.c:649
char * delim
Definition: copy.c:123
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
Node * utilityStmt
Definition: parsenodes.h:118
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool check_partition_constraint)
Definition: execMain.c:1957
bool is_program
Definition: parsenodes.h:1956
#define linitial_node(type, l)
Definition: pg_list.h:114
bool volatile_defexprs
Definition: copy.c:165
Datum oidout(PG_FUNCTION_ARGS)
Definition: oid.c:127
void(* callback)(void *arg)
Definition: elog.h:239
struct ErrorContextCallback * previous
Definition: elog.h:238
#define PG_BINARY_R
Definition: c.h:1071
static void copy_dest_destroy(DestReceiver *self)
Definition: copy.c:4769
bool * force_null_flags
Definition: copy.c:132
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
MemoryContext rowcontext
Definition: copy.c:152
int natts
Definition: tupdesc.h:79
bool line_buf_converted
Definition: copy.c:196
HeapTuple tcs_original_insert_tuple
Definition: trigger.h:82
ResultRelInfo ** partitions
Definition: execPartition.h:94
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc)
Definition: execTuples.c:910
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:634
signed int int32
Definition: c.h:302
int ClosePipeStream(FILE *file)
Definition: fd.c:2755
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
bool * convert_select_flags
Definition: copy.c:135
static void CopySendChar(CopyState cstate, char c)
Definition: copy.c:455
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1662
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
int location
Definition: parsenodes.h:235
CopyDest copy_dest
Definition: copy.c:98
int location
Definition: parsenodes.h:442
#define REFILL_LINEBUF
Definition: copy.c:266
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:703
ErrorContextCallback * error_context_stack
Definition: elog.c:88
#define list_make1(x1)
Definition: pg_list.h:139
char * null_print
Definition: copy.c:120
const char * cur_attname
Definition: copy.c:140
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:459
Definition: copy.c:62
bool trig_insert_instead_row
Definition: reltrigger.h:57
void FreeExecutorState(EState *estate)
Definition: execUtils.c:185
Relation rel
Definition: copy.c:109
#define GetPerTupleExprContext(estate)
Definition: executor.h:490
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2359
MemoryContext copycontext
Definition: copy.c:146
#define pq_startcopyout()
Definition: libpq.h:46
copy_data_source_cb data_source_cb
Definition: copy.c:114
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:127
bool trig_insert_new_table
Definition: reltrigger.h:74
Bitmapset * selectedCols
Definition: parsenodes.h:1069
unsigned short uint16
Definition: c.h:313
void pfree(void *pointer)
Definition: mcxt.c:936
#define pg_ntoh32(x)
Definition: pg_bswap.h:125
#define IS_HIGHBIT_SET(ch)
Definition: c.h:983
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options)
Definition: copy.c:1735
static void CopySendInt16(CopyState cstate, int16 val)
Definition: copy.c:697
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1149
#define ObjectIdGetDatum(X)
Definition: postgres.h:490
#define ERROR
Definition: elog.h:43
void pq_startmsgread(void)
Definition: pqcomm.c:1210
#define DatumGetCString(X)
Definition: postgres.h:549
static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
Definition: copy.c:554
#define lfirst_int(lc)
Definition: pg_list.h:107
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:297
#define pg_hton32(x)
Definition: pg_bswap.h:121
static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr)
Definition: copy.c:4568
Datum ReceiveFunctionCall(FmgrInfo *flinfo, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1676
static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, int firstBufferedLineNo)
Definition: copy.c:2831
TupleConversionMap * tcs_map
Definition: trigger.h:73
#define FATAL
Definition: elog.h:52
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:122
char * defGetString(DefElem *def)
Definition: define.c:49
RangeVar * relation
Definition: parsenodes.h:1950
ItemPointerData t_self
Definition: htup.h:65
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:769
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:67
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:820
TriggerDesc * trigdesc
Definition: rel.h:120
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
static char * limit_printout_length(const char *str)
Definition: copy.c:2249
#define lfirst_node(type, lc)
Definition: pg_list.h:109
QueryDesc * queryDesc
Definition: copy.c:110
EolType
Definition: copy.c:71
bool list_member_int(const List *list, int datum)
Definition: list.c:485
static void copy_dest_shutdown(DestReceiver *self)
Definition: copy.c:4760
bool encoding_embeds_ascii
Definition: copy.c:106
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3051
char * c
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2220
char * raw_buf
Definition: copy.c:207
static bool CopyLoadRawBuf(CopyState cstate)
Definition: copy.c:734
Node * stmt
Definition: parsenodes.h:1447
#define NoLock
Definition: lockdefs.h:34
int pq_getbytes(char *s, size_t len)
Definition: pqcomm.c:1094
char * quote
Definition: copy.c:124
static char * buf
Definition: pg_test_fsync.c:67
#define memmove(d, s, c)
Definition: c.h:1089
bool * tts_isnull
Definition: tuptable.h:126
static Datum ExecEvalExpr(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:282
int pg_database_encoding_max_length(void)
Definition: wchar.c:1833
List * targetList
Definition: parsenodes.h:1536
ResultRelInfo * es_result_relations
Definition: execnodes.h:455
static uint64 DoCopyTo(CopyState cstate)
Definition: copy.c:1873
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: copy.c:4732
int location
Definition: parsenodes.h:730
#define RowExclusiveLock
Definition: lockdefs.h:38
ExprState ** defexprs
Definition: copy.c:164
const char * cur_relname
Definition: copy.c:138
int errcode_for_file_access(void)
Definition: elog.c:598
int pg_encoding_mblen(int encoding, const char *mbstr)
Definition: wchar.c:1785
#define is_absolute_path(filename)
Definition: port.h:86
#define CStringGetDatum(X)
Definition: postgres.h:561
char string[11]
Definition: preproc-type.c:46
Definition: copy.c:75
List * options
Definition: parsenodes.h:1958
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2353
FmgrInfo oid_in_function
Definition: copy.c:159
#define select(n, r, w, e, timeout)
Definition: win32_port.h:447
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:264
#define RelationGetRelationName(relation)
Definition: rel.h:445
static const char BinarySignature[11]
Definition: copy.c:286
#define S_IWGRP
Definition: win32_port.h:286
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:392
unsigned int uint32
Definition: c.h:314
bytea * SendFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1723
int raw_buf_len
Definition: copy.c:209
Oid t_tableOid
Definition: htup.h:66
PartitionDispatch * partition_dispatch_info
Definition: execPartition.h:92
#define RELKIND_FOREIGN_TABLE
Definition: pg_class.h:167
bool trig_insert_after_row
Definition: reltrigger.h:56
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
int max_fields
Definition: copy.c:185
char * escape
Definition: copy.c:125
const char * p_sourcetext
Definition: parse_node.h:173
List * returningList
Definition: parsenodes.h:144
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:477
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2452
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2683
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2617
#define ereport(elevel, rest)
Definition: elog.h:122
Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2436
int null_print_len
Definition: copy.c:121
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1238
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:510
List * force_null
Definition: copy.c:131
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:380
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:399
EState * CreateExecutorState(void)
Definition: execUtils.c:80
List * lappend_int(List *list, int datum)
Definition: list.c:146
Node * arg
Definition: parsenodes.h:728
Definition: copy.c:76
List * lappend(List *list, void *datum)
Definition: list.c:128
int file_encoding
Definition: copy.c:104
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:4659
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
TupleDesc tupDesc
Definition: execdesc.h:47
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
#define stat(a, b)
Definition: win32_port.h:266
#define InvalidSnapshot
Definition: snapshot.h:25
SubTransactionId rd_createSubid
Definition: rel.h:110
Oid * typioparams
Definition: copy.c:162
bool is_program
Definition: copy.c:113
#define RELKIND_PARTITIONED_TABLE
Definition: pg_class.h:168
Node * build_column_default(Relation rel, int attrno)
bool trig_insert_before_row
Definition: reltrigger.h:55
void getTypeBinaryOutputInfo(Oid type, Oid *typSend, bool *typIsVarlena)
Definition: lsyscache.c:2716
List * es_tupleTable
Definition: execnodes.h:490
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:186
void * palloc0(Size size)
Definition: mcxt.c:864
bool header_line
Definition: copy.c:119
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2277
uintptr_t Datum
Definition: postgres.h:365
int GetDatabaseEncoding(void)
Definition: mbutils.c:1004
int pg_get_client_encoding(void)
Definition: mbutils.c:306
#define ACL_SELECT
Definition: parsenodes.h:73
HeapTuple ConvertPartitionTupleSlot(TupleConversionMap *map, HeapTuple tuple, TupleTableSlot *new_slot, TupleTableSlot **p_my_slot)
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4411
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1831
int stmt_len
Definition: parsenodes.h:1449
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen)
Definition: copy.c:237
int stmt_location
Definition: parsenodes.h:1448
int es_num_result_relations
Definition: execnodes.h:456
List * ri_PartitionCheck
Definition: execnodes.h:422
#define RAW_BUF_SIZE
Definition: copy.c:206
TupleDesc rd_att
Definition: rel.h:115
bool freeze
Definition: copy.c:117
void ExecCleanupTupleRouting(PartitionTupleRouting *proute)
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1618
int pq_getbyte(void)
Definition: pqcomm.c:1000
void pq_endmsgread(void)
Definition: pqcomm.c:1234
Relation heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: heapam.c:1319
#define InvalidOid
Definition: postgres_ext.h:36
static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: copy.c:4741
bool XactReadOnly
Definition: xact.c:77
bool * force_notnull_flags
Definition: copy.c:130
CmdType commandType
Definition: parsenodes.h:110
PartitionTupleRouting * partition_tuple_routing
Definition: copy.c:169
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4533
List * force_notnull
Definition: copy.c:129
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void EndCopyFrom(CopyState cstate)
Definition: copy.c:3467
#define PG_CATCH()
Definition: elog.h:293
#define makeNode(_type_)
Definition: nodes.h:561
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
EolType eol_type
Definition: copy.c:103
QuerySource querySource
Definition: parsenodes.h:112
void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed)
Definition: copy.c:779
void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2700
#define Assert(condition)
Definition: c.h:688
#define lfirst(lc)
Definition: pg_list.h:106
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2373
static void CopySendData(CopyState cstate, const void *databuf, int datasize)
Definition: copy.c:443
List * convert_select
Definition: copy.c:134
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:642
StringInfo fe_msgbuf
Definition: copy.c:100
CopyDest
Definition: copy.c:60
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:530
static bool CopyReadLine(CopyState cstate)
Definition: copy.c:3483
#define ACL_INSERT
Definition: parsenodes.h:72
TupleConstr * constr
Definition: tupdesc.h:84
#define pq_endcopyout(errorAbort)
Definition: libpq.h:47
size_t Size
Definition: c.h:422
static void CopySendString(CopyState cstate, const char *str)
Definition: copy.c:449
static int list_length(const List *l)
Definition: pg_list.h:89
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:111
static uint64 CopyTo(CopyState cstate)
Definition: copy.c:1927
RangeTblEntry * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, Alias *alias, bool inh, bool inFromCl)
#define PG_RE_THROW()
Definition: elog.h:314
#define InvalidSubTransactionId
Definition: c.h:469
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:742
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:761
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:2385
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:949
const char * name
Definition: encode.c:521
void PreventCommandIfReadOnly(const char *cmdname)
Definition: utility.c:237