PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22 
23 #include "access/heapam.h"
24 #include "access/htup_details.h"
25 #include "access/sysattr.h"
26 #include "access/xact.h"
27 #include "access/xlog.h"
28 #include "catalog/pg_type.h"
29 #include "commands/copy.h"
30 #include "commands/defrem.h"
31 #include "commands/trigger.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "mb/pg_wchar.h"
36 #include "miscadmin.h"
37 #include "optimizer/clauses.h"
38 #include "optimizer/planner.h"
39 #include "nodes/makefuncs.h"
40 #include "rewrite/rewriteHandler.h"
41 #include "storage/fd.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/portal.h"
47 #include "utils/rel.h"
48 #include "utils/rls.h"
49 #include "utils/snapmgr.h"
50 
51 
52 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
53 #define OCTVALUE(c) ((c) - '0')
54 
55 /*
56  * Represents the different source/dest cases we need to worry about at
57  * the bottom level
58  */
59 typedef enum CopyDest
60 {
61  COPY_FILE, /* to/from file (or a piped program) */
62  COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
63  COPY_NEW_FE /* to/from frontend (3.0 protocol) */
64 } CopyDest;
65 
66 /*
67  * Represents the end-of-line terminator type of the input
68  */
69 typedef enum EolType
70 {
75 } EolType;
76 
77 /*
78  * This struct contains all the state variables used throughout a COPY
79  * operation. For simplicity, we use the same struct for all variants of COPY,
80  * even though some fields are used in only some cases.
81  *
82  * Multi-byte encodings: all supported client-side encodings encode multi-byte
83  * characters by having the first byte's high bit set. Subsequent bytes of the
84  * character can have the high bit not set. When scanning data in such an
85  * encoding to look for a match to a single-byte (ie ASCII) character, we must
86  * use the full pg_encoding_mblen() machinery to skip over multibyte
87  * characters, else we might find a false match to a trailing byte. In
88  * supported server encodings, there is no possibility of a false match, and
89  * it's faster to make useless comparisons to trailing bytes than it is to
90  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
91  * when we have to do it the hard way.
92  */
93 typedef struct CopyStateData
94 {
95  /* low-level state data */
96  CopyDest copy_dest; /* type of copy source/destination */
97  FILE *copy_file; /* used if copy_dest == COPY_FILE */
98  StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
99  * dest == COPY_NEW_FE in COPY FROM */
100  bool fe_eof; /* true if detected end of copy data */
101  EolType eol_type; /* EOL type of input */
102  int file_encoding; /* file or remote side's character encoding */
103  bool need_transcoding; /* file encoding diff from server? */
104  bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
105 
106  /* parameters from the COPY command */
107  Relation rel; /* relation to copy to or from */
108  QueryDesc *queryDesc; /* executable query to copy from */
109  List *attnumlist; /* integer list of attnums to copy */
110  char *filename; /* filename, or NULL for STDIN/STDOUT */
111  bool is_program; /* is 'filename' a program to popen? */
112  bool binary; /* binary format? */
113  bool oids; /* include OIDs? */
114  bool freeze; /* freeze rows on loading? */
115  bool csv_mode; /* Comma Separated Value format? */
116  bool header_line; /* CSV header line? */
117  char *null_print; /* NULL marker string (server encoding!) */
118  int null_print_len; /* length of same */
119  char *null_print_client; /* same converted to file encoding */
120  char *delim; /* column delimiter (must be 1 byte) */
121  char *quote; /* CSV quote char (must be 1 byte) */
122  char *escape; /* CSV escape char (must be 1 byte) */
123  List *force_quote; /* list of column names */
124  bool force_quote_all; /* FORCE_QUOTE *? */
125  bool *force_quote_flags; /* per-column CSV FQ flags */
126  List *force_notnull; /* list of column names */
127  bool *force_notnull_flags; /* per-column CSV FNN flags */
128  List *force_null; /* list of column names */
129  bool *force_null_flags; /* per-column CSV FN flags */
130  bool convert_selectively; /* do selective binary conversion? */
131  List *convert_select; /* list of column names (can be NIL) */
132  bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
133 
134  /* these are just for error messages, see CopyFromErrorCallback */
135  const char *cur_relname; /* table name for error messages */
136  int cur_lineno; /* line number for error messages */
137  const char *cur_attname; /* current att for error messages */
138  const char *cur_attval; /* current att value for error messages */
139 
140  /*
141  * Working state for COPY TO/FROM
142  */
143  MemoryContext copycontext; /* per-copy execution context */
144 
145  /*
146  * Working state for COPY TO
147  */
148  FmgrInfo *out_functions; /* lookup info for output functions */
149  MemoryContext rowcontext; /* per-row evaluation context */
150 
151  /*
152  * Working state for COPY FROM
153  */
158  FmgrInfo *in_functions; /* array of input functions for each attrs */
159  Oid *typioparams; /* array of element types for in_functions */
160  int *defmap; /* array of default att numbers */
161  ExprState **defexprs; /* array of default att expressions */
162  bool volatile_defexprs; /* is any of defexprs volatile? */
164 
166  int num_dispatch; /* Number of entries in the above array */
167  int num_partitions; /* Number of members in the following arrays */
168  ResultRelInfo *partitions; /* Per partition result relation */
171 
172  /*
173  * These variables are used to reduce overhead in textual COPY FROM.
174  *
175  * attribute_buf holds the separated, de-escaped text for each field of
176  * the current line. The CopyReadAttributes functions return arrays of
177  * pointers into this buffer. We avoid palloc/pfree overhead by re-using
178  * the buffer on each cycle.
179  */
181 
182  /* field raw data pointers found by COPY FROM */
183 
185  char **raw_fields;
186 
187  /*
188  * Similarly, line_buf holds the whole input line being processed. The
189  * input cycle is first to read the whole line into line_buf, convert it
190  * to server encoding there, and then extract the individual attribute
191  * fields into attribute_buf. line_buf is preserved unmodified so that we
192  * can display it in error messages if appropriate.
193  */
195  bool line_buf_converted; /* converted to server encoding? */
196  bool line_buf_valid; /* contains the row being processed? */
197 
198  /*
199  * Finally, raw_buf holds raw data read from the data source (file or
200  * client connection). CopyReadLine parses this data sufficiently to
201  * locate line boundaries, then transfers the data to line_buf and
202  * converts it. Note: we guarantee that there is a \0 at
203  * raw_buf[raw_buf_len].
204  */
205 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
206  char *raw_buf;
207  int raw_buf_index; /* next byte to process */
208  int raw_buf_len; /* total # of bytes stored */
209 } CopyStateData;
210 
211 /* DestReceiver for COPY (query) TO */
212 typedef struct
213 {
214  DestReceiver pub; /* publicly-known function pointers */
215  CopyState cstate; /* CopyStateData for the command */
216  uint64 processed; /* # of tuples processed */
217 } DR_copy;
218 
219 
220 /*
221  * These macros centralize code used to process line_buf and raw_buf buffers.
222  * They are macros because they often do continue/break control and to avoid
223  * function call overhead in tight COPY loops.
224  *
225  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
226  * prevent the continue/break processing from working. We end the "if (1)"
227  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
228  * any "else" in the calling code, and to avoid any compiler warnings about
229  * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
230  */
231 
232 /*
233  * This keeps the character read at the top of the loop in the buffer
234  * even if there is more than one read-ahead.
235  */
236 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
237 if (1) \
238 { \
239  if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
240  { \
241  raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
242  need_data = true; \
243  continue; \
244  } \
245 } else ((void) 0)
246 
247 /* This consumes the remainder of the buffer and breaks */
248 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
249 if (1) \
250 { \
251  if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
252  { \
253  if (extralen) \
254  raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
255  /* backslash just before EOF, treat as data char */ \
256  result = true; \
257  break; \
258  } \
259 } else ((void) 0)
260 
261 /*
262  * Transfer any approved data to line_buf; must do this to be sure
263  * there is some room in raw_buf.
264  */
265 #define REFILL_LINEBUF \
266 if (1) \
267 { \
268  if (raw_buf_ptr > cstate->raw_buf_index) \
269  { \
270  appendBinaryStringInfo(&cstate->line_buf, \
271  cstate->raw_buf + cstate->raw_buf_index, \
272  raw_buf_ptr - cstate->raw_buf_index); \
273  cstate->raw_buf_index = raw_buf_ptr; \
274  } \
275 } else ((void) 0)
276 
277 /* Undo any read-ahead and jump out of the block. */
278 #define NO_END_OF_COPY_GOTO \
279 if (1) \
280 { \
281  raw_buf_ptr = prev_raw_ptr + 1; \
282  goto not_end_of_copy; \
283 } else ((void) 0)
284 
285 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
286 
287 
288 /* non-export function prototypes */
289 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
290  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
291  List *options);
292 static void EndCopy(CopyState cstate);
293 static void ClosePipeToProgram(CopyState cstate);
294 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
295  Oid queryRelId, const char *filename, bool is_program,
296  List *attnamelist, List *options);
297 static void EndCopyTo(CopyState cstate);
298 static uint64 DoCopyTo(CopyState cstate);
299 static uint64 CopyTo(CopyState cstate);
300 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
301  Datum *values, bool *nulls);
302 static uint64 CopyFrom(CopyState cstate);
303 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
304  CommandId mycid, int hi_options,
305  ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
306  BulkInsertState bistate,
307  int nBufferedTuples, HeapTuple *bufferedTuples,
308  int firstBufferedLineNo);
309 static bool CopyReadLine(CopyState cstate);
310 static bool CopyReadLineText(CopyState cstate);
311 static int CopyReadAttributesText(CopyState cstate);
312 static int CopyReadAttributesCSV(CopyState cstate);
314  int column_no, FmgrInfo *flinfo,
315  Oid typioparam, int32 typmod,
316  bool *isnull);
317 static void CopyAttributeOutText(CopyState cstate, char *string);
318 static void CopyAttributeOutCSV(CopyState cstate, char *string,
319  bool use_quote, bool single_attr);
320 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
321  List *attnamelist);
322 static char *limit_printout_length(const char *str);
323 
324 /* Low-level communications functions */
325 static void SendCopyBegin(CopyState cstate);
326 static void ReceiveCopyBegin(CopyState cstate);
327 static void SendCopyEnd(CopyState cstate);
328 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
329 static void CopySendString(CopyState cstate, const char *str);
330 static void CopySendChar(CopyState cstate, char c);
331 static void CopySendEndOfRow(CopyState cstate);
332 static int CopyGetData(CopyState cstate, void *databuf,
333  int minread, int maxread);
334 static void CopySendInt32(CopyState cstate, int32 val);
335 static bool CopyGetInt32(CopyState cstate, int32 *val);
336 static void CopySendInt16(CopyState cstate, int16 val);
337 static bool CopyGetInt16(CopyState cstate, int16 *val);
338 
339 
340 /*
341  * Send copy start/stop messages for frontend copies. These have changed
342  * in past protocol redesigns.
343  */
344 static void
346 {
348  {
349  /* new way */
351  int natts = list_length(cstate->attnumlist);
352  int16 format = (cstate->binary ? 1 : 0);
353  int i;
354 
355  pq_beginmessage(&buf, 'H');
356  pq_sendbyte(&buf, format); /* overall format */
357  pq_sendint(&buf, natts, 2);
358  for (i = 0; i < natts; i++)
359  pq_sendint(&buf, format, 2); /* per-column formats */
360  pq_endmessage(&buf);
361  cstate->copy_dest = COPY_NEW_FE;
362  }
363  else
364  {
365  /* old way */
366  if (cstate->binary)
367  ereport(ERROR,
368  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
369  errmsg("COPY BINARY is not supported to stdout or from stdin")));
370  pq_putemptymessage('H');
371  /* grottiness needed for old COPY OUT protocol */
372  pq_startcopyout();
373  cstate->copy_dest = COPY_OLD_FE;
374  }
375 }
376 
377 static void
379 {
381  {
382  /* new way */
384  int natts = list_length(cstate->attnumlist);
385  int16 format = (cstate->binary ? 1 : 0);
386  int i;
387 
388  pq_beginmessage(&buf, 'G');
389  pq_sendbyte(&buf, format); /* overall format */
390  pq_sendint(&buf, natts, 2);
391  for (i = 0; i < natts; i++)
392  pq_sendint(&buf, format, 2); /* per-column formats */
393  pq_endmessage(&buf);
394  cstate->copy_dest = COPY_NEW_FE;
395  cstate->fe_msgbuf = makeLongStringInfo();
396  }
397  else
398  {
399  /* old way */
400  if (cstate->binary)
401  ereport(ERROR,
402  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
403  errmsg("COPY BINARY is not supported to stdout or from stdin")));
404  pq_putemptymessage('G');
405  /* any error in old protocol will make us lose sync */
406  pq_startmsgread();
407  cstate->copy_dest = COPY_OLD_FE;
408  }
409  /* We *must* flush here to ensure FE knows it can send. */
410  pq_flush();
411 }
412 
413 static void
415 {
416  if (cstate->copy_dest == COPY_NEW_FE)
417  {
418  /* Shouldn't have any unsent data */
419  Assert(cstate->fe_msgbuf->len == 0);
420  /* Send Copy Done message */
421  pq_putemptymessage('c');
422  }
423  else
424  {
425  CopySendData(cstate, "\\.", 2);
426  /* Need to flush out the trailer (this also appends a newline) */
427  CopySendEndOfRow(cstate);
428  pq_endcopyout(false);
429  }
430 }
431 
432 /*----------
433  * CopySendData sends output data to the destination (file or frontend)
434  * CopySendString does the same for null-terminated strings
435  * CopySendChar does the same for single characters
436  * CopySendEndOfRow does the appropriate thing at end of each data row
437  * (data is not actually flushed except by CopySendEndOfRow)
438  *
439  * NB: no data conversion is applied by these functions
440  *----------
441  */
442 static void
443 CopySendData(CopyState cstate, const void *databuf, int datasize)
444 {
445  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
446 }
447 
448 static void
449 CopySendString(CopyState cstate, const char *str)
450 {
451  appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
452 }
453 
454 static void
455 CopySendChar(CopyState cstate, char c)
456 {
458 }
459 
460 static void
462 {
463  StringInfo fe_msgbuf = cstate->fe_msgbuf;
464 
465  switch (cstate->copy_dest)
466  {
467  case COPY_FILE:
468  if (!cstate->binary)
469  {
470  /* Default line termination depends on platform */
471 #ifndef WIN32
472  CopySendChar(cstate, '\n');
473 #else
474  CopySendString(cstate, "\r\n");
475 #endif
476  }
477 
478  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
479  cstate->copy_file) != 1 ||
480  ferror(cstate->copy_file))
481  {
482  if (cstate->is_program)
483  {
484  if (errno == EPIPE)
485  {
486  /*
487  * The pipe will be closed automatically on error at
488  * the end of transaction, but we might get a better
489  * error message from the subprocess' exit code than
490  * just "Broken Pipe"
491  */
492  ClosePipeToProgram(cstate);
493 
494  /*
495  * If ClosePipeToProgram() didn't throw an error, the
496  * program terminated normally, but closed the pipe
497  * first. Restore errno, and throw an error.
498  */
499  errno = EPIPE;
500  }
501  ereport(ERROR,
503  errmsg("could not write to COPY program: %m")));
504  }
505  else
506  ereport(ERROR,
508  errmsg("could not write to COPY file: %m")));
509  }
510  break;
511  case COPY_OLD_FE:
512  /* The FE/BE protocol uses \n as newline for all platforms */
513  if (!cstate->binary)
514  CopySendChar(cstate, '\n');
515 
516  if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
517  {
518  /* no hope of recovering connection sync, so FATAL */
519  ereport(FATAL,
520  (errcode(ERRCODE_CONNECTION_FAILURE),
521  errmsg("connection lost during COPY to stdout")));
522  }
523  break;
524  case COPY_NEW_FE:
525  /* The FE/BE protocol uses \n as newline for all platforms */
526  if (!cstate->binary)
527  CopySendChar(cstate, '\n');
528 
529  /* Dump the accumulated row as one CopyData message */
530  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
531  break;
532  }
533 
534  resetStringInfo(fe_msgbuf);
535 }
536 
537 /*
538  * CopyGetData reads data from the source (file or frontend)
539  *
540  * We attempt to read at least minread, and at most maxread, bytes from
541  * the source. The actual number of bytes read is returned; if this is
542  * less than minread, EOF was detected.
543  *
544  * Note: when copying from the frontend, we expect a proper EOF mark per
545  * protocol; if the frontend simply drops the connection, we raise error.
546  * It seems unwise to allow the COPY IN to complete normally in that case.
547  *
548  * NB: no data conversion is applied here.
549  */
550 static int
551 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
552 {
553  int bytesread = 0;
554 
555  switch (cstate->copy_dest)
556  {
557  case COPY_FILE:
558  bytesread = fread(databuf, 1, maxread, cstate->copy_file);
559  if (ferror(cstate->copy_file))
560  ereport(ERROR,
562  errmsg("could not read from COPY file: %m")));
563  break;
564  case COPY_OLD_FE:
565 
566  /*
567  * We cannot read more than minread bytes (which in practice is 1)
568  * because old protocol doesn't have any clear way of separating
569  * the COPY stream from following data. This is slow, but not any
570  * slower than the code path was originally, and we don't care
571  * much anymore about the performance of old protocol.
572  */
573  if (pq_getbytes((char *) databuf, minread))
574  {
575  /* Only a \. terminator is legal EOF in old protocol */
576  ereport(ERROR,
577  (errcode(ERRCODE_CONNECTION_FAILURE),
578  errmsg("unexpected EOF on client connection with an open transaction")));
579  }
580  bytesread = minread;
581  break;
582  case COPY_NEW_FE:
583  while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
584  {
585  int avail;
586 
587  while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
588  {
589  /* Try to receive another message */
590  int mtype;
591 
592  readmessage:
594  pq_startmsgread();
595  mtype = pq_getbyte();
596  if (mtype == EOF)
597  ereport(ERROR,
598  (errcode(ERRCODE_CONNECTION_FAILURE),
599  errmsg("unexpected EOF on client connection with an open transaction")));
600  if (pq_getmessage(cstate->fe_msgbuf, 0))
601  ereport(ERROR,
602  (errcode(ERRCODE_CONNECTION_FAILURE),
603  errmsg("unexpected EOF on client connection with an open transaction")));
605  switch (mtype)
606  {
607  case 'd': /* CopyData */
608  break;
609  case 'c': /* CopyDone */
610  /* COPY IN correctly terminated by frontend */
611  cstate->fe_eof = true;
612  return bytesread;
613  case 'f': /* CopyFail */
614  ereport(ERROR,
615  (errcode(ERRCODE_QUERY_CANCELED),
616  errmsg("COPY from stdin failed: %s",
617  pq_getmsgstring(cstate->fe_msgbuf))));
618  break;
619  case 'H': /* Flush */
620  case 'S': /* Sync */
621 
622  /*
623  * Ignore Flush/Sync for the convenience of client
624  * libraries (such as libpq) that may send those
625  * without noticing that the command they just
626  * sent was COPY.
627  */
628  goto readmessage;
629  default:
630  ereport(ERROR,
631  (errcode(ERRCODE_PROTOCOL_VIOLATION),
632  errmsg("unexpected message type 0x%02X during COPY from stdin",
633  mtype)));
634  break;
635  }
636  }
637  avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
638  if (avail > maxread)
639  avail = maxread;
640  pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
641  databuf = (void *) ((char *) databuf + avail);
642  maxread -= avail;
643  bytesread += avail;
644  }
645  break;
646  }
647 
648  return bytesread;
649 }
650 
651 
652 /*
653  * These functions do apply some data conversion
654  */
655 
656 /*
657  * CopySendInt32 sends an int32 in network byte order
658  */
659 static void
661 {
662  uint32 buf;
663 
664  buf = htonl((uint32) val);
665  CopySendData(cstate, &buf, sizeof(buf));
666 }
667 
668 /*
669  * CopyGetInt32 reads an int32 that appears in network byte order
670  *
671  * Returns true if OK, false if EOF
672  */
673 static bool
675 {
676  uint32 buf;
677 
678  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
679  {
680  *val = 0; /* suppress compiler warning */
681  return false;
682  }
683  *val = (int32) ntohl(buf);
684  return true;
685 }
686 
687 /*
688  * CopySendInt16 sends an int16 in network byte order
689  */
690 static void
692 {
693  uint16 buf;
694 
695  buf = htons((uint16) val);
696  CopySendData(cstate, &buf, sizeof(buf));
697 }
698 
699 /*
700  * CopyGetInt16 reads an int16 that appears in network byte order
701  */
702 static bool
704 {
705  uint16 buf;
706 
707  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
708  {
709  *val = 0; /* suppress compiler warning */
710  return false;
711  }
712  *val = (int16) ntohs(buf);
713  return true;
714 }
715 
716 
717 /*
718  * CopyLoadRawBuf loads some more data into raw_buf
719  *
720  * Returns TRUE if able to obtain at least one more byte, else FALSE.
721  *
722  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
723  * down to the start of the buffer and then we load more data after that.
724  * This case is used only when a frontend multibyte character crosses a
725  * bufferload boundary.
726  */
727 static bool
729 {
730  int nbytes;
731  int inbytes;
732 
733  if (cstate->raw_buf_index < cstate->raw_buf_len)
734  {
735  /* Copy down the unprocessed data */
736  nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
737  memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
738  nbytes);
739  }
740  else
741  nbytes = 0; /* no data need be saved */
742 
743  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
744  1, RAW_BUF_SIZE - nbytes);
745  nbytes += inbytes;
746  cstate->raw_buf[nbytes] = '\0';
747  cstate->raw_buf_index = 0;
748  cstate->raw_buf_len = nbytes;
749  return (inbytes > 0);
750 }
751 
752 
753 /*
754  * DoCopy executes the SQL COPY statement
755  *
756  * Either unload or reload contents of table <relation>, depending on <from>.
757  * (<from> = TRUE means we are inserting into the table.) In the "TO" case
758  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
759  * or DELETE query.
760  *
761  * If <pipe> is false, transfer is between the table and the file named
762  * <filename>. Otherwise, transfer is between the table and our regular
763  * input/output stream. The latter could be either stdin/stdout or a
764  * socket, depending on whether we're running under Postmaster control.
765  *
766  * Do not allow a Postgres user without superuser privilege to read from
767  * or write to a file.
768  *
769  * Do not allow the copy if user doesn't have proper permission to access
770  * the table or the specifically requested columns.
771  */
772 void
773 DoCopy(ParseState *pstate, const CopyStmt *stmt,
774  int stmt_location, int stmt_len,
775  uint64 *processed)
776 {
777  CopyState cstate;
778  bool is_from = stmt->is_from;
779  bool pipe = (stmt->filename == NULL);
780  Relation rel;
781  Oid relid;
782  RawStmt *query = NULL;
783  List *range_table = NIL;
784 
785  /* Disallow COPY to/from file or program except to superusers. */
786  if (!pipe && !superuser())
787  {
788  if (stmt->is_program)
789  ereport(ERROR,
790  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
791  errmsg("must be superuser to COPY to or from an external program"),
792  errhint("Anyone can COPY to stdout or from stdin. "
793  "psql's \\copy command also works for anyone.")));
794  else
795  ereport(ERROR,
796  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
797  errmsg("must be superuser to COPY to or from a file"),
798  errhint("Anyone can COPY to stdout or from stdin. "
799  "psql's \\copy command also works for anyone.")));
800  }
801 
802  if (stmt->relation)
803  {
804  TupleDesc tupDesc;
805  AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
806  List *attnums;
807  ListCell *cur;
808  RangeTblEntry *rte;
809 
810  Assert(!stmt->query);
811 
812  /* Open and lock the relation, using the appropriate lock type. */
813  rel = heap_openrv(stmt->relation,
814  (is_from ? RowExclusiveLock : AccessShareLock));
815 
816  relid = RelationGetRelid(rel);
817 
818  rte = makeNode(RangeTblEntry);
819  rte->rtekind = RTE_RELATION;
820  rte->relid = RelationGetRelid(rel);
821  rte->relkind = rel->rd_rel->relkind;
822  rte->requiredPerms = required_access;
823  range_table = list_make1(rte);
824 
825  tupDesc = RelationGetDescr(rel);
826  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
827  foreach(cur, attnums)
828  {
829  int attno = lfirst_int(cur) -
831 
832  if (is_from)
833  rte->insertedCols = bms_add_member(rte->insertedCols, attno);
834  else
835  rte->selectedCols = bms_add_member(rte->selectedCols, attno);
836  }
837  ExecCheckRTPerms(range_table, true);
838 
839  /*
840  * Permission check for row security policies.
841  *
842  * check_enable_rls will ereport(ERROR) if the user has requested
843  * something invalid and will otherwise indicate if we should enable
844  * RLS (returns RLS_ENABLED) or not for this COPY statement.
845  *
846  * If the relation has a row security policy and we are to apply it
847  * then perform a "query" copy and allow the normal query processing
848  * to handle the policies.
849  *
850  * If RLS is not enabled for this, then just fall through to the
851  * normal non-filtering relation handling.
852  */
853  if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
854  {
856  ColumnRef *cr;
857  ResTarget *target;
858  RangeVar *from;
859  List *targetList = NIL;
860 
861  if (is_from)
862  ereport(ERROR,
863  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
864  errmsg("COPY FROM not supported with row-level security"),
865  errhint("Use INSERT statements instead.")));
866 
867  /*
868  * Build target list
869  *
870  * If no columns are specified in the attribute list of the COPY
871  * command, then the target list is 'all' columns. Therefore, '*'
872  * should be used as the target list for the resulting SELECT
873  * statement.
874  *
875  * In the case that columns are specified in the attribute list,
876  * create a ColumnRef and ResTarget for each column and add them
877  * to the target list for the resulting SELECT statement.
878  */
879  if (!stmt->attlist)
880  {
881  cr = makeNode(ColumnRef);
883  cr->location = -1;
884 
885  target = makeNode(ResTarget);
886  target->name = NULL;
887  target->indirection = NIL;
888  target->val = (Node *) cr;
889  target->location = -1;
890 
891  targetList = list_make1(target);
892  }
893  else
894  {
895  ListCell *lc;
896 
897  foreach(lc, stmt->attlist)
898  {
899  /*
900  * Build the ColumnRef for each column. The ColumnRef
901  * 'fields' property is a String 'Value' node (see
902  * nodes/value.h) that corresponds to the column name
903  * respectively.
904  */
905  cr = makeNode(ColumnRef);
906  cr->fields = list_make1(lfirst(lc));
907  cr->location = -1;
908 
909  /* Build the ResTarget and add the ColumnRef to it. */
910  target = makeNode(ResTarget);
911  target->name = NULL;
912  target->indirection = NIL;
913  target->val = (Node *) cr;
914  target->location = -1;
915 
916  /* Add each column to the SELECT statement's target list */
917  targetList = lappend(targetList, target);
918  }
919  }
920 
921  /*
922  * Build RangeVar for from clause, fully qualified based on the
923  * relation which we have opened and locked.
924  */
926  RelationGetRelationName(rel), -1);
927 
928  /* Build query */
929  select = makeNode(SelectStmt);
930  select->targetList = targetList;
931  select->fromClause = list_make1(from);
932 
933  query = makeNode(RawStmt);
934  query->stmt = (Node *) select;
935  query->stmt_location = stmt_location;
936  query->stmt_len = stmt_len;
937 
938  /*
939  * Close the relation for now, but keep the lock on it to prevent
940  * changes between now and when we start the query-based COPY.
941  *
942  * We'll reopen it later as part of the query-based COPY.
943  */
944  heap_close(rel, NoLock);
945  rel = NULL;
946  }
947  }
948  else
949  {
950  Assert(stmt->query);
951 
952  query = makeNode(RawStmt);
953  query->stmt = stmt->query;
954  query->stmt_location = stmt_location;
955  query->stmt_len = stmt_len;
956 
957  relid = InvalidOid;
958  rel = NULL;
959  }
960 
961  if (is_from)
962  {
963  Assert(rel);
964 
965  /* check read-only transaction and parallel mode */
966  if (XactReadOnly && !rel->rd_islocaltemp)
967  PreventCommandIfReadOnly("COPY FROM");
968  PreventCommandIfParallelMode("COPY FROM");
969 
970  cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
971  stmt->attlist, stmt->options);
972  cstate->range_table = range_table;
973  *processed = CopyFrom(cstate); /* copy from file to database */
974  EndCopyFrom(cstate);
975  }
976  else
977  {
978  cstate = BeginCopyTo(pstate, rel, query, relid,
979  stmt->filename, stmt->is_program,
980  stmt->attlist, stmt->options);
981  *processed = DoCopyTo(cstate); /* copy from database to file */
982  EndCopyTo(cstate);
983  }
984 
985  /*
986  * Close the relation. If reading, we can release the AccessShareLock we
987  * got; if writing, we should hold the lock until end of transaction to
988  * ensure that updates will be committed before lock is released.
989  */
990  if (rel != NULL)
991  heap_close(rel, (is_from ? NoLock : AccessShareLock));
992 }
993 
994 /*
995  * Process the statement option list for COPY.
996  *
997  * Scan the options list (a list of DefElem) and transpose the information
998  * into cstate, applying appropriate error checking.
999  *
1000  * cstate is assumed to be filled with zeroes initially.
1001  *
1002  * This is exported so that external users of the COPY API can sanity-check
1003  * a list of options. In that usage, cstate should be passed as NULL
1004  * (since external users don't know sizeof(CopyStateData)) and the collected
1005  * data is just leaked until CurrentMemoryContext is reset.
1006  *
1007  * Note that additional checking, such as whether column names listed in FORCE
1008  * QUOTE actually exist, has to be applied later. This just checks for
1009  * self-consistency of the options list.
1010  */
1011 void
1013  CopyState cstate,
1014  bool is_from,
1015  List *options)
1016 {
1017  bool format_specified = false;
1018  ListCell *option;
1019 
1020  /* Support external use for option sanity checking */
1021  if (cstate == NULL)
1022  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1023 
1024  cstate->file_encoding = -1;
1025 
1026  /* Extract options from the statement node tree */
1027  foreach(option, options)
1028  {
1029  DefElem *defel = castNode(DefElem, lfirst(option));
1030 
1031  if (strcmp(defel->defname, "format") == 0)
1032  {
1033  char *fmt = defGetString(defel);
1034 
1035  if (format_specified)
1036  ereport(ERROR,
1037  (errcode(ERRCODE_SYNTAX_ERROR),
1038  errmsg("conflicting or redundant options"),
1039  parser_errposition(pstate, defel->location)));
1040  format_specified = true;
1041  if (strcmp(fmt, "text") == 0)
1042  /* default format */ ;
1043  else if (strcmp(fmt, "csv") == 0)
1044  cstate->csv_mode = true;
1045  else if (strcmp(fmt, "binary") == 0)
1046  cstate->binary = true;
1047  else
1048  ereport(ERROR,
1049  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050  errmsg("COPY format \"%s\" not recognized", fmt),
1051  parser_errposition(pstate, defel->location)));
1052  }
1053  else if (strcmp(defel->defname, "oids") == 0)
1054  {
1055  if (cstate->oids)
1056  ereport(ERROR,
1057  (errcode(ERRCODE_SYNTAX_ERROR),
1058  errmsg("conflicting or redundant options"),
1059  parser_errposition(pstate, defel->location)));
1060  cstate->oids = defGetBoolean(defel);
1061  }
1062  else if (strcmp(defel->defname, "freeze") == 0)
1063  {
1064  if (cstate->freeze)
1065  ereport(ERROR,
1066  (errcode(ERRCODE_SYNTAX_ERROR),
1067  errmsg("conflicting or redundant options"),
1068  parser_errposition(pstate, defel->location)));
1069  cstate->freeze = defGetBoolean(defel);
1070  }
1071  else if (strcmp(defel->defname, "delimiter") == 0)
1072  {
1073  if (cstate->delim)
1074  ereport(ERROR,
1075  (errcode(ERRCODE_SYNTAX_ERROR),
1076  errmsg("conflicting or redundant options"),
1077  parser_errposition(pstate, defel->location)));
1078  cstate->delim = defGetString(defel);
1079  }
1080  else if (strcmp(defel->defname, "null") == 0)
1081  {
1082  if (cstate->null_print)
1083  ereport(ERROR,
1084  (errcode(ERRCODE_SYNTAX_ERROR),
1085  errmsg("conflicting or redundant options"),
1086  parser_errposition(pstate, defel->location)));
1087  cstate->null_print = defGetString(defel);
1088  }
1089  else if (strcmp(defel->defname, "header") == 0)
1090  {
1091  if (cstate->header_line)
1092  ereport(ERROR,
1093  (errcode(ERRCODE_SYNTAX_ERROR),
1094  errmsg("conflicting or redundant options"),
1095  parser_errposition(pstate, defel->location)));
1096  cstate->header_line = defGetBoolean(defel);
1097  }
1098  else if (strcmp(defel->defname, "quote") == 0)
1099  {
1100  if (cstate->quote)
1101  ereport(ERROR,
1102  (errcode(ERRCODE_SYNTAX_ERROR),
1103  errmsg("conflicting or redundant options"),
1104  parser_errposition(pstate, defel->location)));
1105  cstate->quote = defGetString(defel);
1106  }
1107  else if (strcmp(defel->defname, "escape") == 0)
1108  {
1109  if (cstate->escape)
1110  ereport(ERROR,
1111  (errcode(ERRCODE_SYNTAX_ERROR),
1112  errmsg("conflicting or redundant options"),
1113  parser_errposition(pstate, defel->location)));
1114  cstate->escape = defGetString(defel);
1115  }
1116  else if (strcmp(defel->defname, "force_quote") == 0)
1117  {
1118  if (cstate->force_quote || cstate->force_quote_all)
1119  ereport(ERROR,
1120  (errcode(ERRCODE_SYNTAX_ERROR),
1121  errmsg("conflicting or redundant options"),
1122  parser_errposition(pstate, defel->location)));
1123  if (defel->arg && IsA(defel->arg, A_Star))
1124  cstate->force_quote_all = true;
1125  else if (defel->arg && IsA(defel->arg, List))
1126  cstate->force_quote = castNode(List, defel->arg);
1127  else
1128  ereport(ERROR,
1129  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1130  errmsg("argument to option \"%s\" must be a list of column names",
1131  defel->defname),
1132  parser_errposition(pstate, defel->location)));
1133  }
1134  else if (strcmp(defel->defname, "force_not_null") == 0)
1135  {
1136  if (cstate->force_notnull)
1137  ereport(ERROR,
1138  (errcode(ERRCODE_SYNTAX_ERROR),
1139  errmsg("conflicting or redundant options"),
1140  parser_errposition(pstate, defel->location)));
1141  if (defel->arg && IsA(defel->arg, List))
1142  cstate->force_notnull = castNode(List, defel->arg);
1143  else
1144  ereport(ERROR,
1145  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146  errmsg("argument to option \"%s\" must be a list of column names",
1147  defel->defname),
1148  parser_errposition(pstate, defel->location)));
1149  }
1150  else if (strcmp(defel->defname, "force_null") == 0)
1151  {
1152  if (cstate->force_null)
1153  ereport(ERROR,
1154  (errcode(ERRCODE_SYNTAX_ERROR),
1155  errmsg("conflicting or redundant options")));
1156  if (defel->arg && IsA(defel->arg, List))
1157  cstate->force_null = castNode(List, defel->arg);
1158  else
1159  ereport(ERROR,
1160  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1161  errmsg("argument to option \"%s\" must be a list of column names",
1162  defel->defname),
1163  parser_errposition(pstate, defel->location)));
1164  }
1165  else if (strcmp(defel->defname, "convert_selectively") == 0)
1166  {
1167  /*
1168  * Undocumented, not-accessible-from-SQL option: convert only the
1169  * named columns to binary form, storing the rest as NULLs. It's
1170  * allowed for the column list to be NIL.
1171  */
1172  if (cstate->convert_selectively)
1173  ereport(ERROR,
1174  (errcode(ERRCODE_SYNTAX_ERROR),
1175  errmsg("conflicting or redundant options"),
1176  parser_errposition(pstate, defel->location)));
1177  cstate->convert_selectively = true;
1178  if (defel->arg == NULL || IsA(defel->arg, List))
1179  cstate->convert_select = castNode(List, defel->arg);
1180  else
1181  ereport(ERROR,
1182  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1183  errmsg("argument to option \"%s\" must be a list of column names",
1184  defel->defname),
1185  parser_errposition(pstate, defel->location)));
1186  }
1187  else if (strcmp(defel->defname, "encoding") == 0)
1188  {
1189  if (cstate->file_encoding >= 0)
1190  ereport(ERROR,
1191  (errcode(ERRCODE_SYNTAX_ERROR),
1192  errmsg("conflicting or redundant options"),
1193  parser_errposition(pstate, defel->location)));
1195  if (cstate->file_encoding < 0)
1196  ereport(ERROR,
1197  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1198  errmsg("argument to option \"%s\" must be a valid encoding name",
1199  defel->defname),
1200  parser_errposition(pstate, defel->location)));
1201  }
1202  else
1203  ereport(ERROR,
1204  (errcode(ERRCODE_SYNTAX_ERROR),
1205  errmsg("option \"%s\" not recognized",
1206  defel->defname),
1207  parser_errposition(pstate, defel->location)));
1208  }
1209 
1210  /*
1211  * Check for incompatible options (must do these two before inserting
1212  * defaults)
1213  */
1214  if (cstate->binary && cstate->delim)
1215  ereport(ERROR,
1216  (errcode(ERRCODE_SYNTAX_ERROR),
1217  errmsg("cannot specify DELIMITER in BINARY mode")));
1218 
1219  if (cstate->binary && cstate->null_print)
1220  ereport(ERROR,
1221  (errcode(ERRCODE_SYNTAX_ERROR),
1222  errmsg("cannot specify NULL in BINARY mode")));
1223 
1224  /* Set defaults for omitted options */
1225  if (!cstate->delim)
1226  cstate->delim = cstate->csv_mode ? "," : "\t";
1227 
1228  if (!cstate->null_print)
1229  cstate->null_print = cstate->csv_mode ? "" : "\\N";
1230  cstate->null_print_len = strlen(cstate->null_print);
1231 
1232  if (cstate->csv_mode)
1233  {
1234  if (!cstate->quote)
1235  cstate->quote = "\"";
1236  if (!cstate->escape)
1237  cstate->escape = cstate->quote;
1238  }
1239 
1240  /* Only single-byte delimiter strings are supported. */
1241  if (strlen(cstate->delim) != 1)
1242  ereport(ERROR,
1243  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1244  errmsg("COPY delimiter must be a single one-byte character")));
1245 
1246  /* Disallow end-of-line characters */
1247  if (strchr(cstate->delim, '\r') != NULL ||
1248  strchr(cstate->delim, '\n') != NULL)
1249  ereport(ERROR,
1250  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1251  errmsg("COPY delimiter cannot be newline or carriage return")));
1252 
1253  if (strchr(cstate->null_print, '\r') != NULL ||
1254  strchr(cstate->null_print, '\n') != NULL)
1255  ereport(ERROR,
1256  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1257  errmsg("COPY null representation cannot use newline or carriage return")));
1258 
1259  /*
1260  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1261  * backslash because it would be ambiguous. We can't allow the other
1262  * cases because data characters matching the delimiter must be
1263  * backslashed, and certain backslash combinations are interpreted
1264  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1265  * more than strictly necessary, but seems best for consistency and
1266  * future-proofing. Likewise we disallow all digits though only octal
1267  * digits are actually dangerous.
1268  */
1269  if (!cstate->csv_mode &&
1270  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1271  cstate->delim[0]) != NULL)
1272  ereport(ERROR,
1273  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1274  errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1275 
1276  /* Check header */
1277  if (!cstate->csv_mode && cstate->header_line)
1278  ereport(ERROR,
1279  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1280  errmsg("COPY HEADER available only in CSV mode")));
1281 
1282  /* Check quote */
1283  if (!cstate->csv_mode && cstate->quote != NULL)
1284  ereport(ERROR,
1285  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1286  errmsg("COPY quote available only in CSV mode")));
1287 
1288  if (cstate->csv_mode && strlen(cstate->quote) != 1)
1289  ereport(ERROR,
1290  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1291  errmsg("COPY quote must be a single one-byte character")));
1292 
1293  if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1294  ereport(ERROR,
1295  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1296  errmsg("COPY delimiter and quote must be different")));
1297 
1298  /* Check escape */
1299  if (!cstate->csv_mode && cstate->escape != NULL)
1300  ereport(ERROR,
1301  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1302  errmsg("COPY escape available only in CSV mode")));
1303 
1304  if (cstate->csv_mode && strlen(cstate->escape) != 1)
1305  ereport(ERROR,
1306  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1307  errmsg("COPY escape must be a single one-byte character")));
1308 
1309  /* Check force_quote */
1310  if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1311  ereport(ERROR,
1312  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1313  errmsg("COPY force quote available only in CSV mode")));
1314  if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1315  ereport(ERROR,
1316  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317  errmsg("COPY force quote only available using COPY TO")));
1318 
1319  /* Check force_notnull */
1320  if (!cstate->csv_mode && cstate->force_notnull != NIL)
1321  ereport(ERROR,
1322  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1323  errmsg("COPY force not null available only in CSV mode")));
1324  if (cstate->force_notnull != NIL && !is_from)
1325  ereport(ERROR,
1326  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327  errmsg("COPY force not null only available using COPY FROM")));
1328 
1329  /* Check force_null */
1330  if (!cstate->csv_mode && cstate->force_null != NIL)
1331  ereport(ERROR,
1332  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1333  errmsg("COPY force null available only in CSV mode")));
1334 
1335  if (cstate->force_null != NIL && !is_from)
1336  ereport(ERROR,
1337  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1338  errmsg("COPY force null only available using COPY FROM")));
1339 
1340  /* Don't allow the delimiter to appear in the null string. */
1341  if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1342  ereport(ERROR,
1343  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1344  errmsg("COPY delimiter must not appear in the NULL specification")));
1345 
1346  /* Don't allow the CSV quote char to appear in the null string. */
1347  if (cstate->csv_mode &&
1348  strchr(cstate->null_print, cstate->quote[0]) != NULL)
1349  ereport(ERROR,
1350  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1351  errmsg("CSV quote character must not appear in the NULL specification")));
1352 }
1353 
1354 /*
1355  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1356  *
1357  * Iff <binary>, unload or reload in the binary format, as opposed to the
1358  * more wasteful but more robust and portable text format.
1359  *
1360  * Iff <oids>, unload or reload the format that includes OID information.
1361  * On input, we accept OIDs whether or not the table has an OID column,
1362  * but silently drop them if it does not. On output, we report an error
1363  * if the user asks for OIDs in a table that has none (not providing an
1364  * OID column might seem friendlier, but could seriously confuse programs).
1365  *
1366  * If in the text format, delimit columns with delimiter <delim> and print
1367  * NULL values as <null_print>.
1368  */
1369 static CopyState
1371  bool is_from,
1372  Relation rel,
1373  RawStmt *raw_query,
1374  Oid queryRelId,
1375  List *attnamelist,
1376  List *options)
1377 {
1378  CopyState cstate;
1379  TupleDesc tupDesc;
1380  int num_phys_attrs;
1381  MemoryContext oldcontext;
1382 
1383  /* Allocate workspace and zero all fields */
1384  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1385 
1386  /*
1387  * We allocate everything used by a cstate in a new memory context. This
1388  * avoids memory leaks during repeated use of COPY in a query.
1389  */
1391  "COPY",
1393 
1394  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1395 
1396  /* Extract options from the statement node tree */
1397  ProcessCopyOptions(pstate, cstate, is_from, options);
1398 
1399  /* Process the source/target relation or query */
1400  if (rel)
1401  {
1402  Assert(!raw_query);
1403 
1404  cstate->rel = rel;
1405 
1406  tupDesc = RelationGetDescr(cstate->rel);
1407 
1408  /* Don't allow COPY w/ OIDs to or from a table without them */
1409  if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1410  ereport(ERROR,
1411  (errcode(ERRCODE_UNDEFINED_COLUMN),
1412  errmsg("table \"%s\" does not have OIDs",
1413  RelationGetRelationName(cstate->rel))));
1414 
1415  /* Initialize state for CopyFrom tuple routing. */
1416  if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1417  {
1418  PartitionDispatch *partition_dispatch_info;
1419  ResultRelInfo *partitions;
1420  TupleConversionMap **partition_tupconv_maps;
1421  TupleTableSlot *partition_tuple_slot;
1422  int num_parted,
1423  num_partitions;
1424 
1426  &partition_dispatch_info,
1427  &partitions,
1428  &partition_tupconv_maps,
1429  &partition_tuple_slot,
1430  &num_parted, &num_partitions);
1431  cstate->partition_dispatch_info = partition_dispatch_info;
1432  cstate->num_dispatch = num_parted;
1433  cstate->partitions = partitions;
1434  cstate->num_partitions = num_partitions;
1435  cstate->partition_tupconv_maps = partition_tupconv_maps;
1436  cstate->partition_tuple_slot = partition_tuple_slot;
1437  }
1438  }
1439  else
1440  {
1441  List *rewritten;
1442  Query *query;
1443  PlannedStmt *plan;
1444  DestReceiver *dest;
1445 
1446  Assert(!is_from);
1447  cstate->rel = NULL;
1448 
1449  /* Don't allow COPY w/ OIDs from a query */
1450  if (cstate->oids)
1451  ereport(ERROR,
1452  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1453  errmsg("COPY (query) WITH OIDS is not supported")));
1454 
1455  /*
1456  * Run parse analysis and rewrite. Note this also acquires sufficient
1457  * locks on the source table(s).
1458  *
1459  * Because the parser and planner tend to scribble on their input, we
1460  * make a preliminary copy of the source querytree. This prevents
1461  * problems in the case that the COPY is in a portal or plpgsql
1462  * function and is executed repeatedly. (See also the same hack in
1463  * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1464  */
1465  rewritten = pg_analyze_and_rewrite((RawStmt *) copyObject(raw_query),
1466  pstate->p_sourcetext, NULL, 0);
1467 
1468  /* check that we got back something we can work with */
1469  if (rewritten == NIL)
1470  {
1471  ereport(ERROR,
1472  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1473  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1474  }
1475  else if (list_length(rewritten) > 1)
1476  {
1477  ListCell *lc;
1478 
1479  /* examine queries to determine which error message to issue */
1480  foreach(lc, rewritten)
1481  {
1482  Query *q = castNode(Query, lfirst(lc));
1483 
1485  ereport(ERROR,
1486  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1487  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1489  ereport(ERROR,
1490  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1491  errmsg("DO ALSO rules are not supported for the COPY")));
1492  }
1493 
1494  ereport(ERROR,
1495  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1496  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1497  }
1498 
1499  query = castNode(Query, linitial(rewritten));
1500 
1501  /* The grammar allows SELECT INTO, but we don't support that */
1502  if (query->utilityStmt != NULL &&
1504  ereport(ERROR,
1505  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1506  errmsg("COPY (SELECT INTO) is not supported")));
1507 
1508  Assert(query->utilityStmt == NULL);
1509 
1510  /*
1511  * Similarly the grammar doesn't enforce the presence of a RETURNING
1512  * clause, but this is required here.
1513  */
1514  if (query->commandType != CMD_SELECT &&
1515  query->returningList == NIL)
1516  {
1517  Assert(query->commandType == CMD_INSERT ||
1518  query->commandType == CMD_UPDATE ||
1519  query->commandType == CMD_DELETE);
1520 
1521  ereport(ERROR,
1522  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1523  errmsg("COPY query must have a RETURNING clause")));
1524  }
1525 
1526  /* plan the query */
1527  plan = pg_plan_query(query, 0, NULL);
1528 
1529  /*
1530  * With row level security and a user using "COPY relation TO", we
1531  * have to convert the "COPY relation TO" to a query-based COPY (eg:
1532  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1533  * in any RLS clauses.
1534  *
1535  * When this happens, we are passed in the relid of the originally
1536  * found relation (which we have locked). As the planner will look up
1537  * the relation again, we double-check here to make sure it found the
1538  * same one that we have locked.
1539  */
1540  if (queryRelId != InvalidOid)
1541  {
1542  /*
1543  * Note that with RLS involved there may be multiple relations,
1544  * and while the one we need is almost certainly first, we don't
1545  * make any guarantees of that in the planner, so check the whole
1546  * list and make sure we find the original relation.
1547  */
1548  if (!list_member_oid(plan->relationOids, queryRelId))
1549  ereport(ERROR,
1550  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1551  errmsg("relation referenced by COPY statement has changed")));
1552  }
1553 
1554  /*
1555  * Use a snapshot with an updated command ID to ensure this query sees
1556  * results of any previously executed queries.
1557  */
1560 
1561  /* Create dest receiver for COPY OUT */
1563  ((DR_copy *) dest)->cstate = cstate;
1564 
1565  /* Create a QueryDesc requesting no output */
1566  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1569  dest, NULL, 0);
1570 
1571  /*
1572  * Call ExecutorStart to prepare the plan for execution.
1573  *
1574  * ExecutorStart computes a result tupdesc for us
1575  */
1576  ExecutorStart(cstate->queryDesc, 0);
1577 
1578  tupDesc = cstate->queryDesc->tupDesc;
1579  }
1580 
1581  /* Generate or convert list of attributes to process */
1582  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1583 
1584  num_phys_attrs = tupDesc->natts;
1585 
1586  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1587  cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1588  if (cstate->force_quote_all)
1589  {
1590  int i;
1591 
1592  for (i = 0; i < num_phys_attrs; i++)
1593  cstate->force_quote_flags[i] = true;
1594  }
1595  else if (cstate->force_quote)
1596  {
1597  List *attnums;
1598  ListCell *cur;
1599 
1600  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1601 
1602  foreach(cur, attnums)
1603  {
1604  int attnum = lfirst_int(cur);
1605 
1606  if (!list_member_int(cstate->attnumlist, attnum))
1607  ereport(ERROR,
1608  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1609  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1610  NameStr(tupDesc->attrs[attnum - 1]->attname))));
1611  cstate->force_quote_flags[attnum - 1] = true;
1612  }
1613  }
1614 
1615  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1616  cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1617  if (cstate->force_notnull)
1618  {
1619  List *attnums;
1620  ListCell *cur;
1621 
1622  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1623 
1624  foreach(cur, attnums)
1625  {
1626  int attnum = lfirst_int(cur);
1627 
1628  if (!list_member_int(cstate->attnumlist, attnum))
1629  ereport(ERROR,
1630  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1631  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1632  NameStr(tupDesc->attrs[attnum - 1]->attname))));
1633  cstate->force_notnull_flags[attnum - 1] = true;
1634  }
1635  }
1636 
1637  /* Convert FORCE_NULL name list to per-column flags, check validity */
1638  cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1639  if (cstate->force_null)
1640  {
1641  List *attnums;
1642  ListCell *cur;
1643 
1644  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1645 
1646  foreach(cur, attnums)
1647  {
1648  int attnum = lfirst_int(cur);
1649 
1650  if (!list_member_int(cstate->attnumlist, attnum))
1651  ereport(ERROR,
1652  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1653  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1654  NameStr(tupDesc->attrs[attnum - 1]->attname))));
1655  cstate->force_null_flags[attnum - 1] = true;
1656  }
1657  }
1658 
1659  /* Convert convert_selectively name list to per-column flags */
1660  if (cstate->convert_selectively)
1661  {
1662  List *attnums;
1663  ListCell *cur;
1664 
1665  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1666 
1667  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1668 
1669  foreach(cur, attnums)
1670  {
1671  int attnum = lfirst_int(cur);
1672 
1673  if (!list_member_int(cstate->attnumlist, attnum))
1674  ereport(ERROR,
1675  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1676  errmsg_internal("selected column \"%s\" not referenced by COPY",
1677  NameStr(tupDesc->attrs[attnum - 1]->attname))));
1678  cstate->convert_select_flags[attnum - 1] = true;
1679  }
1680  }
1681 
1682  /* Use client encoding when ENCODING option is not specified. */
1683  if (cstate->file_encoding < 0)
1685 
1686  /*
1687  * Set up encoding conversion info. Even if the file and server encodings
1688  * are the same, we must apply pg_any_to_server() to validate data in
1689  * multibyte encodings.
1690  */
1691  cstate->need_transcoding =
1692  (cstate->file_encoding != GetDatabaseEncoding() ||
1694  /* See Multibyte encoding comment above */
1696 
1697  cstate->copy_dest = COPY_FILE; /* default */
1698 
1699  MemoryContextSwitchTo(oldcontext);
1700 
1701  return cstate;
1702 }
1703 
1704 /*
1705  * Closes the pipe to an external program, checking the pclose() return code.
1706  */
1707 static void
1709 {
1710  int pclose_rc;
1711 
1712  Assert(cstate->is_program);
1713 
1714  pclose_rc = ClosePipeStream(cstate->copy_file);
1715  if (pclose_rc == -1)
1716  ereport(ERROR,
1718  errmsg("could not close pipe to external command: %m")));
1719  else if (pclose_rc != 0)
1720  ereport(ERROR,
1721  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1722  errmsg("program \"%s\" failed",
1723  cstate->filename),
1724  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1725 }
1726 
1727 /*
1728  * Release resources allocated in a cstate for COPY TO/FROM.
1729  */
1730 static void
1732 {
1733  if (cstate->is_program)
1734  {
1735  ClosePipeToProgram(cstate);
1736  }
1737  else
1738  {
1739  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1740  ereport(ERROR,
1742  errmsg("could not close file \"%s\": %m",
1743  cstate->filename)));
1744  }
1745 
1747  pfree(cstate);
1748 }
1749 
1750 /*
1751  * Setup CopyState to read tuples from a table or a query for COPY TO.
1752  */
1753 static CopyState
1755  Relation rel,
1756  RawStmt *query,
1757  Oid queryRelId,
1758  const char *filename,
1759  bool is_program,
1760  List *attnamelist,
1761  List *options)
1762 {
1763  CopyState cstate;
1764  bool pipe = (filename == NULL);
1765  MemoryContext oldcontext;
1766 
1767  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1768  {
1769  if (rel->rd_rel->relkind == RELKIND_VIEW)
1770  ereport(ERROR,
1771  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1772  errmsg("cannot copy from view \"%s\"",
1774  errhint("Try the COPY (SELECT ...) TO variant.")));
1775  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1776  ereport(ERROR,
1777  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1778  errmsg("cannot copy from materialized view \"%s\"",
1780  errhint("Try the COPY (SELECT ...) TO variant.")));
1781  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1782  ereport(ERROR,
1783  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1784  errmsg("cannot copy from foreign table \"%s\"",
1786  errhint("Try the COPY (SELECT ...) TO variant.")));
1787  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1788  ereport(ERROR,
1789  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1790  errmsg("cannot copy from sequence \"%s\"",
1791  RelationGetRelationName(rel))));
1792  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1793  ereport(ERROR,
1794  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1795  errmsg("cannot copy from partitioned table \"%s\"",
1797  errhint("Try the COPY (SELECT ...) TO variant.")));
1798  else
1799  ereport(ERROR,
1800  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1801  errmsg("cannot copy from non-table relation \"%s\"",
1802  RelationGetRelationName(rel))));
1803  }
1804 
1805  cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1806  options);
1807  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1808 
1809  if (pipe)
1810  {
1811  Assert(!is_program); /* the grammar does not allow this */
1813  cstate->copy_file = stdout;
1814  }
1815  else
1816  {
1817  cstate->filename = pstrdup(filename);
1818  cstate->is_program = is_program;
1819 
1820  if (is_program)
1821  {
1822  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1823  if (cstate->copy_file == NULL)
1824  ereport(ERROR,
1826  errmsg("could not execute command \"%s\": %m",
1827  cstate->filename)));
1828  }
1829  else
1830  {
1831  mode_t oumask; /* Pre-existing umask value */
1832  struct stat st;
1833 
1834  /*
1835  * Prevent write to relative path ... too easy to shoot oneself in
1836  * the foot by overwriting a database file ...
1837  */
1838  if (!is_absolute_path(filename))
1839  ereport(ERROR,
1840  (errcode(ERRCODE_INVALID_NAME),
1841  errmsg("relative path not allowed for COPY to file")));
1842 
1843  oumask = umask(S_IWGRP | S_IWOTH);
1844  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1845  umask(oumask);
1846  if (cstate->copy_file == NULL)
1847  {
1848  /* copy errno because ereport subfunctions might change it */
1849  int save_errno = errno;
1850 
1851  ereport(ERROR,
1853  errmsg("could not open file \"%s\" for writing: %m",
1854  cstate->filename),
1855  (save_errno == ENOENT || save_errno == EACCES) ?
1856  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1857  "You may want a client-side facility such as psql's \\copy.") : 0));
1858  }
1859 
1860  if (fstat(fileno(cstate->copy_file), &st))
1861  ereport(ERROR,
1863  errmsg("could not stat file \"%s\": %m",
1864  cstate->filename)));
1865 
1866  if (S_ISDIR(st.st_mode))
1867  ereport(ERROR,
1868  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1869  errmsg("\"%s\" is a directory", cstate->filename)));
1870  }
1871  }
1872 
1873  MemoryContextSwitchTo(oldcontext);
1874 
1875  return cstate;
1876 }
1877 
1878 /*
1879  * This intermediate routine exists mainly to localize the effects of setjmp
1880  * so we don't need to plaster a lot of variables with "volatile".
1881  */
1882 static uint64
1884 {
1885  bool pipe = (cstate->filename == NULL);
1886  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1887  uint64 processed;
1888 
1889  PG_TRY();
1890  {
1891  if (fe_copy)
1892  SendCopyBegin(cstate);
1893 
1894  processed = CopyTo(cstate);
1895 
1896  if (fe_copy)
1897  SendCopyEnd(cstate);
1898  }
1899  PG_CATCH();
1900  {
1901  /*
1902  * Make sure we turn off old-style COPY OUT mode upon error. It is
1903  * okay to do this in all cases, since it does nothing if the mode is
1904  * not on.
1905  */
1906  pq_endcopyout(true);
1907  PG_RE_THROW();
1908  }
1909  PG_END_TRY();
1910 
1911  return processed;
1912 }
1913 
1914 /*
1915  * Clean up storage and release resources for COPY TO.
1916  */
1917 static void
1919 {
1920  if (cstate->queryDesc != NULL)
1921  {
1922  /* Close down the query and free resources. */
1923  ExecutorFinish(cstate->queryDesc);
1924  ExecutorEnd(cstate->queryDesc);
1925  FreeQueryDesc(cstate->queryDesc);
1927  }
1928 
1929  /* Clean up storage */
1930  EndCopy(cstate);
1931 }
1932 
1933 /*
1934  * Copy from relation or query TO file.
1935  */
1936 static uint64
1938 {
1939  TupleDesc tupDesc;
1940  int num_phys_attrs;
1941  Form_pg_attribute *attr;
1942  ListCell *cur;
1943  uint64 processed;
1944 
1945  if (cstate->rel)
1946  tupDesc = RelationGetDescr(cstate->rel);
1947  else
1948  tupDesc = cstate->queryDesc->tupDesc;
1949  attr = tupDesc->attrs;
1950  num_phys_attrs = tupDesc->natts;
1951  cstate->null_print_client = cstate->null_print; /* default */
1952 
1953  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1954  cstate->fe_msgbuf = makeLongStringInfo();
1955 
1956  /* Get info about the columns we need to process. */
1957  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1958  foreach(cur, cstate->attnumlist)
1959  {
1960  int attnum = lfirst_int(cur);
1961  Oid out_func_oid;
1962  bool isvarlena;
1963 
1964  if (cstate->binary)
1965  getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1966  &out_func_oid,
1967  &isvarlena);
1968  else
1969  getTypeOutputInfo(attr[attnum - 1]->atttypid,
1970  &out_func_oid,
1971  &isvarlena);
1972  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1973  }
1974 
1975  /*
1976  * Create a temporary memory context that we can reset once per row to
1977  * recover palloc'd memory. This avoids any problems with leaks inside
1978  * datatype output routines, and should be faster than retail pfree's
1979  * anyway. (We don't need a whole econtext as CopyFrom does.)
1980  */
1982  "COPY TO",
1984 
1985  if (cstate->binary)
1986  {
1987  /* Generate header for a binary copy */
1988  int32 tmp;
1989 
1990  /* Signature */
1991  CopySendData(cstate, BinarySignature, 11);
1992  /* Flags field */
1993  tmp = 0;
1994  if (cstate->oids)
1995  tmp |= (1 << 16);
1996  CopySendInt32(cstate, tmp);
1997  /* No header extension */
1998  tmp = 0;
1999  CopySendInt32(cstate, tmp);
2000  }
2001  else
2002  {
2003  /*
2004  * For non-binary copy, we need to convert null_print to file
2005  * encoding, because it will be sent directly with CopySendString.
2006  */
2007  if (cstate->need_transcoding)
2008  cstate->null_print_client = pg_server_to_any(cstate->null_print,
2009  cstate->null_print_len,
2010  cstate->file_encoding);
2011 
2012  /* if a header has been requested send the line */
2013  if (cstate->header_line)
2014  {
2015  bool hdr_delim = false;
2016 
2017  foreach(cur, cstate->attnumlist)
2018  {
2019  int attnum = lfirst_int(cur);
2020  char *colname;
2021 
2022  if (hdr_delim)
2023  CopySendChar(cstate, cstate->delim[0]);
2024  hdr_delim = true;
2025 
2026  colname = NameStr(attr[attnum - 1]->attname);
2027 
2028  CopyAttributeOutCSV(cstate, colname, false,
2029  list_length(cstate->attnumlist) == 1);
2030  }
2031 
2032  CopySendEndOfRow(cstate);
2033  }
2034  }
2035 
2036  if (cstate->rel)
2037  {
2038  Datum *values;
2039  bool *nulls;
2040  HeapScanDesc scandesc;
2041  HeapTuple tuple;
2042 
2043  values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2044  nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2045 
2046  scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2047 
2048  processed = 0;
2049  while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2050  {
2052 
2053  /* Deconstruct the tuple ... faster than repeated heap_getattr */
2054  heap_deform_tuple(tuple, tupDesc, values, nulls);
2055 
2056  /* Format and send the data */
2057  CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2058  processed++;
2059  }
2060 
2061  heap_endscan(scandesc);
2062 
2063  pfree(values);
2064  pfree(nulls);
2065  }
2066  else
2067  {
2068  /* run the plan --- the dest receiver will send tuples */
2070  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2071  }
2072 
2073  if (cstate->binary)
2074  {
2075  /* Generate trailer for a binary copy */
2076  CopySendInt16(cstate, -1);
2077  /* Need to flush out the trailer */
2078  CopySendEndOfRow(cstate);
2079  }
2080 
2082 
2083  return processed;
2084 }
2085 
2086 /*
2087  * Emit one row during CopyTo().
2088  */
2089 static void
2090 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2091 {
2092  bool need_delim = false;
2093  FmgrInfo *out_functions = cstate->out_functions;
2094  MemoryContext oldcontext;
2095  ListCell *cur;
2096  char *string;
2097 
2098  MemoryContextReset(cstate->rowcontext);
2099  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2100 
2101  if (cstate->binary)
2102  {
2103  /* Binary per-tuple header */
2104  CopySendInt16(cstate, list_length(cstate->attnumlist));
2105  /* Send OID if wanted --- note attnumlist doesn't include it */
2106  if (cstate->oids)
2107  {
2108  /* Hack --- assume Oid is same size as int32 */
2109  CopySendInt32(cstate, sizeof(int32));
2110  CopySendInt32(cstate, tupleOid);
2111  }
2112  }
2113  else
2114  {
2115  /* Text format has no per-tuple header, but send OID if wanted */
2116  /* Assume digits don't need any quoting or encoding conversion */
2117  if (cstate->oids)
2118  {
2120  ObjectIdGetDatum(tupleOid)));
2121  CopySendString(cstate, string);
2122  need_delim = true;
2123  }
2124  }
2125 
2126  foreach(cur, cstate->attnumlist)
2127  {
2128  int attnum = lfirst_int(cur);
2129  Datum value = values[attnum - 1];
2130  bool isnull = nulls[attnum - 1];
2131 
2132  if (!cstate->binary)
2133  {
2134  if (need_delim)
2135  CopySendChar(cstate, cstate->delim[0]);
2136  need_delim = true;
2137  }
2138 
2139  if (isnull)
2140  {
2141  if (!cstate->binary)
2142  CopySendString(cstate, cstate->null_print_client);
2143  else
2144  CopySendInt32(cstate, -1);
2145  }
2146  else
2147  {
2148  if (!cstate->binary)
2149  {
2150  string = OutputFunctionCall(&out_functions[attnum - 1],
2151  value);
2152  if (cstate->csv_mode)
2153  CopyAttributeOutCSV(cstate, string,
2154  cstate->force_quote_flags[attnum - 1],
2155  list_length(cstate->attnumlist) == 1);
2156  else
2157  CopyAttributeOutText(cstate, string);
2158  }
2159  else
2160  {
2161  bytea *outputbytes;
2162 
2163  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2164  value);
2165  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2166  CopySendData(cstate, VARDATA(outputbytes),
2167  VARSIZE(outputbytes) - VARHDRSZ);
2168  }
2169  }
2170  }
2171 
2172  CopySendEndOfRow(cstate);
2173 
2174  MemoryContextSwitchTo(oldcontext);
2175 }
2176 
2177 
2178 /*
2179  * error context callback for COPY FROM
2180  *
2181  * The argument for the error context must be CopyState.
2182  */
2183 void
2185 {
2186  CopyState cstate = (CopyState) arg;
2187 
2188  if (cstate->binary)
2189  {
2190  /* can't usefully display the data */
2191  if (cstate->cur_attname)
2192  errcontext("COPY %s, line %d, column %s",
2193  cstate->cur_relname, cstate->cur_lineno,
2194  cstate->cur_attname);
2195  else
2196  errcontext("COPY %s, line %d",
2197  cstate->cur_relname, cstate->cur_lineno);
2198  }
2199  else
2200  {
2201  if (cstate->cur_attname && cstate->cur_attval)
2202  {
2203  /* error is relevant to a particular column */
2204  char *attval;
2205 
2206  attval = limit_printout_length(cstate->cur_attval);
2207  errcontext("COPY %s, line %d, column %s: \"%s\"",
2208  cstate->cur_relname, cstate->cur_lineno,
2209  cstate->cur_attname, attval);
2210  pfree(attval);
2211  }
2212  else if (cstate->cur_attname)
2213  {
2214  /* error is relevant to a particular column, value is NULL */
2215  errcontext("COPY %s, line %d, column %s: null input",
2216  cstate->cur_relname, cstate->cur_lineno,
2217  cstate->cur_attname);
2218  }
2219  else
2220  {
2221  /*
2222  * Error is relevant to a particular line.
2223  *
2224  * If line_buf still contains the correct line, and it's already
2225  * transcoded, print it. If it's still in a foreign encoding, it's
2226  * quite likely that the error is precisely a failure to do
2227  * encoding conversion (ie, bad data). We dare not try to convert
2228  * it, and at present there's no way to regurgitate it without
2229  * conversion. So we have to punt and just report the line number.
2230  */
2231  if (cstate->line_buf_valid &&
2232  (cstate->line_buf_converted || !cstate->need_transcoding))
2233  {
2234  char *lineval;
2235 
2236  lineval = limit_printout_length(cstate->line_buf.data);
2237  errcontext("COPY %s, line %d: \"%s\"",
2238  cstate->cur_relname, cstate->cur_lineno, lineval);
2239  pfree(lineval);
2240  }
2241  else
2242  {
2243  errcontext("COPY %s, line %d",
2244  cstate->cur_relname, cstate->cur_lineno);
2245  }
2246  }
2247  }
2248 }
2249 
2250 /*
2251  * Make sure we don't print an unreasonable amount of COPY data in a message.
2252  *
2253  * It would seem a lot easier to just use the sprintf "precision" limit to
2254  * truncate the string. However, some versions of glibc have a bug/misfeature
2255  * that vsnprintf will always fail (return -1) if it is asked to truncate
2256  * a string that contains invalid byte sequences for the current encoding.
2257  * So, do our own truncation. We return a pstrdup'd copy of the input.
2258  */
2259 static char *
2260 limit_printout_length(const char *str)
2261 {
2262 #define MAX_COPY_DATA_DISPLAY 100
2263 
2264  int slen = strlen(str);
2265  int len;
2266  char *res;
2267 
2268  /* Fast path if definitely okay */
2269  if (slen <= MAX_COPY_DATA_DISPLAY)
2270  return pstrdup(str);
2271 
2272  /* Apply encoding-dependent truncation */
2273  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2274 
2275  /*
2276  * Truncate, and add "..." to show we truncated the input.
2277  */
2278  res = (char *) palloc(len + 4);
2279  memcpy(res, str, len);
2280  strcpy(res + len, "...");
2281 
2282  return res;
2283 }
2284 
2285 /*
2286  * Copy FROM file to relation.
2287  */
2288 static uint64
2290 {
2291  HeapTuple tuple;
2292  TupleDesc tupDesc;
2293  Datum *values;
2294  bool *nulls;
2295  ResultRelInfo *resultRelInfo;
2296  ResultRelInfo *saved_resultRelInfo = NULL;
2297  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2298  ExprContext *econtext;
2299  TupleTableSlot *myslot;
2300  MemoryContext oldcontext = CurrentMemoryContext;
2301 
2302  ErrorContextCallback errcallback;
2303  CommandId mycid = GetCurrentCommandId(true);
2304  int hi_options = 0; /* start with default heap_insert options */
2305  BulkInsertState bistate;
2306  uint64 processed = 0;
2307  bool useHeapMultiInsert;
2308  int nBufferedTuples = 0;
2309  int prev_leaf_part_index = -1;
2310 
2311 #define MAX_BUFFERED_TUPLES 1000
2312  HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2313  Size bufferedTuplesSize = 0;
2314  int firstBufferedLineNo = 0;
2315 
2316  Assert(cstate->rel);
2317 
2318  /*
2319  * The target must be a plain relation or have an INSTEAD OF INSERT row
2320  * trigger. (Currently, such triggers are only allowed on views, so we
2321  * only hint about them in the view case.)
2322  */
2323  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2324  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2325  !(cstate->rel->trigdesc &&
2327  {
2328  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2329  ereport(ERROR,
2330  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2331  errmsg("cannot copy to view \"%s\"",
2332  RelationGetRelationName(cstate->rel)),
2333  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2334  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2335  ereport(ERROR,
2336  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2337  errmsg("cannot copy to materialized view \"%s\"",
2338  RelationGetRelationName(cstate->rel))));
2339  else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2340  ereport(ERROR,
2341  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2342  errmsg("cannot copy to foreign table \"%s\"",
2343  RelationGetRelationName(cstate->rel))));
2344  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2345  ereport(ERROR,
2346  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2347  errmsg("cannot copy to sequence \"%s\"",
2348  RelationGetRelationName(cstate->rel))));
2349  else
2350  ereport(ERROR,
2351  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2352  errmsg("cannot copy to non-table relation \"%s\"",
2353  RelationGetRelationName(cstate->rel))));
2354  }
2355 
2356  tupDesc = RelationGetDescr(cstate->rel);
2357 
2358  /*----------
2359  * Check to see if we can avoid writing WAL
2360  *
2361  * If archive logging/streaming is not enabled *and* either
2362  * - table was created in same transaction as this COPY
2363  * - data is being written to relfilenode created in this transaction
2364  * then we can skip writing WAL. It's safe because if the transaction
2365  * doesn't commit, we'll discard the table (or the new relfilenode file).
2366  * If it does commit, we'll have done the heap_sync at the bottom of this
2367  * routine first.
2368  *
2369  * As mentioned in comments in utils/rel.h, the in-same-transaction test
2370  * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2371  * can be cleared before the end of the transaction. The exact case is
2372  * when a relation sets a new relfilenode twice in same transaction, yet
2373  * the second one fails in an aborted subtransaction, e.g.
2374  *
2375  * BEGIN;
2376  * TRUNCATE t;
2377  * SAVEPOINT save;
2378  * TRUNCATE t;
2379  * ROLLBACK TO save;
2380  * COPY ...
2381  *
2382  * Also, if the target file is new-in-transaction, we assume that checking
2383  * FSM for free space is a waste of time, even if we must use WAL because
2384  * of archiving. This could possibly be wrong, but it's unlikely.
2385  *
2386  * The comments for heap_insert and RelationGetBufferForTuple specify that
2387  * skipping WAL logging is only safe if we ensure that our tuples do not
2388  * go into pages containing tuples from any other transactions --- but this
2389  * must be the case if we have a new table or new relfilenode, so we need
2390  * no additional work to enforce that.
2391  *----------
2392  */
2393  /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2394  if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2396  {
2397  hi_options |= HEAP_INSERT_SKIP_FSM;
2398  if (!XLogIsNeeded())
2399  hi_options |= HEAP_INSERT_SKIP_WAL;
2400  }
2401 
2402  /*
2403  * Optimize if new relfilenode was created in this subxact or one of its
2404  * committed children and we won't see those rows later as part of an
2405  * earlier scan or command. This ensures that if this subtransaction
2406  * aborts then the frozen rows won't be visible after xact cleanup. Note
2407  * that the stronger test of exactly which subtransaction created it is
2408  * crucial for correctness of this optimisation.
2409  */
2410  if (cstate->freeze)
2411  {
2413  ereport(ERROR,
2414  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2415  errmsg("cannot perform FREEZE because of prior transaction activity")));
2416 
2417  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2419  ereport(ERROR,
2420  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2421  errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2422 
2423  hi_options |= HEAP_INSERT_FROZEN;
2424  }
2425 
2426  /*
2427  * We need a ResultRelInfo so we can use the regular executor's
2428  * index-entry-making machinery. (There used to be a huge amount of code
2429  * here that basically duplicated execUtils.c ...)
2430  */
2431  resultRelInfo = makeNode(ResultRelInfo);
2432  InitResultRelInfo(resultRelInfo,
2433  cstate->rel,
2434  1, /* dummy rangetable index */
2435  NULL,
2436  0);
2437 
2438  ExecOpenIndices(resultRelInfo, false);
2439 
2440  estate->es_result_relations = resultRelInfo;
2441  estate->es_num_result_relations = 1;
2442  estate->es_result_relation_info = resultRelInfo;
2443  estate->es_range_table = cstate->range_table;
2444 
2445  /* Set up a tuple slot too */
2446  myslot = ExecInitExtraTupleSlot(estate);
2447  ExecSetSlotDescriptor(myslot, tupDesc);
2448  /* Triggers might need a slot as well */
2449  estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2450 
2451  /*
2452  * It's more efficient to prepare a bunch of tuples for insertion, and
2453  * insert them in one heap_multi_insert() call, than call heap_insert()
2454  * separately for every tuple. However, we can't do that if there are
2455  * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2456  * expressions. Such triggers or expressions might query the table we're
2457  * inserting to, and act differently if the tuples that have already been
2458  * processed and prepared for insertion are not there. We also can't do
2459  * it if the table is partitioned.
2460  */
2461  if ((resultRelInfo->ri_TrigDesc != NULL &&
2462  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2463  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2464  cstate->partition_dispatch_info != NULL ||
2465  cstate->volatile_defexprs)
2466  {
2467  useHeapMultiInsert = false;
2468  }
2469  else
2470  {
2471  useHeapMultiInsert = true;
2472  bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2473  }
2474 
2475  /* Prepare to catch AFTER triggers. */
2477 
2478  /*
2479  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2480  * should do this for COPY, since it's not really an "INSERT" statement as
2481  * such. However, executing these triggers maintains consistency with the
2482  * EACH ROW triggers that we already fire on COPY.
2483  */
2484  ExecBSInsertTriggers(estate, resultRelInfo);
2485 
2486  values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2487  nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2488 
2489  bistate = GetBulkInsertState();
2490  econtext = GetPerTupleExprContext(estate);
2491 
2492  /* Set up callback to identify error line number */
2493  errcallback.callback = CopyFromErrorCallback;
2494  errcallback.arg = (void *) cstate;
2495  errcallback.previous = error_context_stack;
2496  error_context_stack = &errcallback;
2497 
2498  for (;;)
2499  {
2500  TupleTableSlot *slot,
2501  *oldslot;
2502  bool skip_tuple;
2503  Oid loaded_oid = InvalidOid;
2504 
2506 
2507  if (nBufferedTuples == 0)
2508  {
2509  /*
2510  * Reset the per-tuple exprcontext. We can only do this if the
2511  * tuple buffer is empty. (Calling the context the per-tuple
2512  * memory context is a bit of a misnomer now.)
2513  */
2514  ResetPerTupleExprContext(estate);
2515  }
2516 
2517  /* Switch into its memory context */
2519 
2520  if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2521  break;
2522 
2523  /* And now we can form the input tuple. */
2524  tuple = heap_form_tuple(tupDesc, values, nulls);
2525 
2526  if (loaded_oid != InvalidOid)
2527  HeapTupleSetOid(tuple, loaded_oid);
2528 
2529  /*
2530  * Constraints might reference the tableoid column, so initialize
2531  * t_tableOid before evaluating them.
2532  */
2533  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2534 
2535  /* Triggers and stuff need to be invoked in query context. */
2536  MemoryContextSwitchTo(oldcontext);
2537 
2538  /* Place tuple in tuple slot --- but slot shouldn't free it */
2539  slot = myslot;
2540  ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2541 
2542  /* Determine the partition to heap_insert the tuple into */
2543  oldslot = slot;
2544  if (cstate->partition_dispatch_info)
2545  {
2546  int leaf_part_index;
2547  TupleConversionMap *map;
2548 
2549  /*
2550  * Away we go ... If we end up not finding a partition after all,
2551  * ExecFindPartition() does not return and errors out instead.
2552  * Otherwise, the returned value is to be used as an index into
2553  * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2554  * will get us the ResultRelInfo and TupleConversionMap for the
2555  * partition, respectively.
2556  */
2557  leaf_part_index = ExecFindPartition(resultRelInfo,
2558  cstate->partition_dispatch_info,
2559  slot,
2560  estate);
2561  Assert(leaf_part_index >= 0 &&
2562  leaf_part_index < cstate->num_partitions);
2563 
2564  /*
2565  * If this tuple is mapped to a partition that is not same as the
2566  * previous one, we'd better make the bulk insert mechanism gets a
2567  * new buffer.
2568  */
2569  if (prev_leaf_part_index != leaf_part_index)
2570  {
2571  ReleaseBulkInsertStatePin(bistate);
2572  prev_leaf_part_index = leaf_part_index;
2573  }
2574 
2575  /*
2576  * Save the old ResultRelInfo and switch to the one corresponding
2577  * to the selected partition.
2578  */
2579  saved_resultRelInfo = resultRelInfo;
2580  resultRelInfo = cstate->partitions + leaf_part_index;
2581 
2582  /* We do not yet have a way to insert into a foreign partition */
2583  if (resultRelInfo->ri_FdwRoutine)
2584  ereport(ERROR,
2585  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2586  errmsg("cannot route inserted tuples to a foreign table")));
2587 
2588  /*
2589  * For ExecInsertIndexTuples() to work on the partition's indexes
2590  */
2591  estate->es_result_relation_info = resultRelInfo;
2592 
2593  /*
2594  * We might need to convert from the parent rowtype to the
2595  * partition rowtype.
2596  */
2597  map = cstate->partition_tupconv_maps[leaf_part_index];
2598  if (map)
2599  {
2600  Relation partrel = resultRelInfo->ri_RelationDesc;
2601 
2602  tuple = do_convert_tuple(tuple, map);
2603 
2604  /*
2605  * We must use the partition's tuple descriptor from this
2606  * point on. Use a dedicated slot from this point on until
2607  * we're finished dealing with the partition.
2608  */
2609  slot = cstate->partition_tuple_slot;
2610  Assert(slot != NULL);
2611  ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2612  ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2613  }
2614 
2615  tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2616  }
2617 
2618  skip_tuple = false;
2619 
2620  /* BEFORE ROW INSERT Triggers */
2621  if (resultRelInfo->ri_TrigDesc &&
2622  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2623  {
2624  slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2625 
2626  if (slot == NULL) /* "do nothing" */
2627  skip_tuple = true;
2628  else /* trigger might have changed tuple */
2629  tuple = ExecMaterializeSlot(slot);
2630  }
2631 
2632  if (!skip_tuple)
2633  {
2634  if (resultRelInfo->ri_TrigDesc &&
2635  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2636  {
2637  /* Pass the data to the INSTEAD ROW INSERT trigger */
2638  ExecIRInsertTriggers(estate, resultRelInfo, slot);
2639  }
2640  else
2641  {
2642  /* Check the constraints of the tuple */
2643  if (cstate->rel->rd_att->constr ||
2644  resultRelInfo->ri_PartitionCheck)
2645  ExecConstraints(resultRelInfo, slot, oldslot, estate);
2646 
2647  if (useHeapMultiInsert)
2648  {
2649  /* Add this tuple to the tuple buffer */
2650  if (nBufferedTuples == 0)
2651  firstBufferedLineNo = cstate->cur_lineno;
2652  bufferedTuples[nBufferedTuples++] = tuple;
2653  bufferedTuplesSize += tuple->t_len;
2654 
2655  /*
2656  * If the buffer filled up, flush it. Also flush if the
2657  * total size of all the tuples in the buffer becomes
2658  * large, to avoid using large amounts of memory for the
2659  * buffer when the tuples are exceptionally wide.
2660  */
2661  if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2662  bufferedTuplesSize > 65535)
2663  {
2664  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2665  resultRelInfo, myslot, bistate,
2666  nBufferedTuples, bufferedTuples,
2667  firstBufferedLineNo);
2668  nBufferedTuples = 0;
2669  bufferedTuplesSize = 0;
2670  }
2671  }
2672  else
2673  {
2674  List *recheckIndexes = NIL;
2675 
2676  /* OK, store the tuple and create index entries for it */
2677  heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2678  hi_options, bistate);
2679 
2680  if (resultRelInfo->ri_NumIndices > 0)
2681  recheckIndexes = ExecInsertIndexTuples(slot,
2682  &(tuple->t_self),
2683  estate,
2684  false,
2685  NULL,
2686  NIL);
2687 
2688  /* AFTER ROW INSERT Triggers */
2689  ExecARInsertTriggers(estate, resultRelInfo, tuple,
2690  recheckIndexes);
2691 
2692  list_free(recheckIndexes);
2693  }
2694  }
2695 
2696  /*
2697  * We count only tuples not suppressed by a BEFORE INSERT trigger;
2698  * this is the same definition used by execMain.c for counting
2699  * tuples inserted by an INSERT command.
2700  */
2701  processed++;
2702 
2703  if (saved_resultRelInfo)
2704  {
2705  resultRelInfo = saved_resultRelInfo;
2706  estate->es_result_relation_info = resultRelInfo;
2707  }
2708  }
2709  }
2710 
2711  /* Flush any remaining buffered tuples */
2712  if (nBufferedTuples > 0)
2713  CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2714  resultRelInfo, myslot, bistate,
2715  nBufferedTuples, bufferedTuples,
2716  firstBufferedLineNo);
2717 
2718  /* Done, clean up */
2719  error_context_stack = errcallback.previous;
2720 
2721  FreeBulkInsertState(bistate);
2722 
2723  MemoryContextSwitchTo(oldcontext);
2724 
2725  /*
2726  * In the old protocol, tell pqcomm that we can process normal protocol
2727  * messages again.
2728  */
2729  if (cstate->copy_dest == COPY_OLD_FE)
2730  pq_endmsgread();
2731 
2732  /* Execute AFTER STATEMENT insertion triggers */
2733  ExecASInsertTriggers(estate, resultRelInfo);
2734 
2735  /* Handle queued AFTER triggers */
2736  AfterTriggerEndQuery(estate);
2737 
2738  pfree(values);
2739  pfree(nulls);
2740 
2741  ExecResetTupleTable(estate->es_tupleTable, false);
2742 
2743  ExecCloseIndices(resultRelInfo);
2744 
2745  /* Close all the partitioned tables, leaf partitions, and their indices */
2746  if (cstate->partition_dispatch_info)
2747  {
2748  int i;
2749 
2750  /*
2751  * Remember cstate->partition_dispatch_info[0] corresponds to the root
2752  * partitioned table, which we must not try to close, because it is
2753  * the main target table of COPY that will be closed eventually by
2754  * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2755  */
2756  for (i = 1; i < cstate->num_dispatch; i++)
2757  {
2759 
2760  heap_close(pd->reldesc, NoLock);
2762  }
2763  for (i = 0; i < cstate->num_partitions; i++)
2764  {
2765  ResultRelInfo *resultRelInfo = cstate->partitions + i;
2766 
2767  ExecCloseIndices(resultRelInfo);
2768  heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2769  }
2770 
2771  /* Release the standalone partition tuple descriptor */
2773  }
2774 
2775  FreeExecutorState(estate);
2776 
2777  /*
2778  * If we skipped writing WAL, then we need to sync the heap (but not
2779  * indexes since those use WAL anyway)
2780  */
2781  if (hi_options & HEAP_INSERT_SKIP_WAL)
2782  heap_sync(cstate->rel);
2783 
2784  return processed;
2785 }
2786 
2787 /*
2788  * A subroutine of CopyFrom, to write the current batch of buffered heap
2789  * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2790  * triggers.
2791  */
2792 static void
2794  int hi_options, ResultRelInfo *resultRelInfo,
2795  TupleTableSlot *myslot, BulkInsertState bistate,
2796  int nBufferedTuples, HeapTuple *bufferedTuples,
2797  int firstBufferedLineNo)
2798 {
2799  MemoryContext oldcontext;
2800  int i;
2801  int save_cur_lineno;
2802 
2803  /*
2804  * Print error context information correctly, if one of the operations
2805  * below fail.
2806  */
2807  cstate->line_buf_valid = false;
2808  save_cur_lineno = cstate->cur_lineno;
2809 
2810  /*
2811  * heap_multi_insert leaks memory, so switch to short-lived memory context
2812  * before calling it.
2813  */
2814  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2815  heap_multi_insert(cstate->rel,
2816  bufferedTuples,
2817  nBufferedTuples,
2818  mycid,
2819  hi_options,
2820  bistate);
2821  MemoryContextSwitchTo(oldcontext);
2822 
2823  /*
2824  * If there are any indexes, update them for all the inserted tuples, and
2825  * run AFTER ROW INSERT triggers.
2826  */
2827  if (resultRelInfo->ri_NumIndices > 0)
2828  {
2829  for (i = 0; i < nBufferedTuples; i++)
2830  {
2831  List *recheckIndexes;
2832 
2833  cstate->cur_lineno = firstBufferedLineNo + i;
2834  ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2835  recheckIndexes =
2836  ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2837  estate, false, NULL, NIL);
2838  ExecARInsertTriggers(estate, resultRelInfo,
2839  bufferedTuples[i],
2840  recheckIndexes);
2841  list_free(recheckIndexes);
2842  }
2843  }
2844 
2845  /*
2846  * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2847  * anyway.
2848  */
2849  else if (resultRelInfo->ri_TrigDesc != NULL &&
2850  resultRelInfo->ri_TrigDesc->trig_insert_after_row)
2851  {
2852  for (i = 0; i < nBufferedTuples; i++)
2853  {
2854  cstate->cur_lineno = firstBufferedLineNo + i;
2855  ExecARInsertTriggers(estate, resultRelInfo,
2856  bufferedTuples[i],
2857  NIL);
2858  }
2859  }
2860 
2861  /* reset cur_lineno to where we were */
2862  cstate->cur_lineno = save_cur_lineno;
2863 }
2864 
2865 /*
2866  * Setup to read tuples from a file for COPY FROM.
2867  *
2868  * 'rel': Used as a template for the tuples
2869  * 'filename': Name of server-local file to read
2870  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2871  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2872  *
2873  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2874  */
2875 CopyState
2877  Relation rel,
2878  const char *filename,
2879  bool is_program,
2880  List *attnamelist,
2881  List *options)
2882 {
2883  CopyState cstate;
2884  bool pipe = (filename == NULL);
2885  TupleDesc tupDesc;
2886  Form_pg_attribute *attr;
2887  AttrNumber num_phys_attrs,
2888  num_defaults;
2889  FmgrInfo *in_functions;
2890  Oid *typioparams;
2891  int attnum;
2892  Oid in_func_oid;
2893  int *defmap;
2894  ExprState **defexprs;
2895  MemoryContext oldcontext;
2896  bool volatile_defexprs;
2897 
2898  cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2899  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2900 
2901  /* Initialize state variables */
2902  cstate->fe_eof = false;
2903  cstate->eol_type = EOL_UNKNOWN;
2904  cstate->cur_relname = RelationGetRelationName(cstate->rel);
2905  cstate->cur_lineno = 0;
2906  cstate->cur_attname = NULL;
2907  cstate->cur_attval = NULL;
2908 
2909  /* Set up variables to avoid per-attribute overhead. */
2911  initLongStringInfo(&cstate->line_buf);
2912  cstate->line_buf_converted = false;
2913  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
2914  cstate->raw_buf_index = cstate->raw_buf_len = 0;
2915 
2916  tupDesc = RelationGetDescr(cstate->rel);
2917  attr = tupDesc->attrs;
2918  num_phys_attrs = tupDesc->natts;
2919  num_defaults = 0;
2920  volatile_defexprs = false;
2921 
2922  /*
2923  * Pick up the required catalog information for each attribute in the
2924  * relation, including the input function, the element type (to pass to
2925  * the input function), and info about defaults and constraints. (Which
2926  * input function we use depends on text/binary format choice.)
2927  */
2928  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2929  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
2930  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
2931  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
2932 
2933  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
2934  {
2935  /* We don't need info for dropped attributes */
2936  if (attr[attnum - 1]->attisdropped)
2937  continue;
2938 
2939  /* Fetch the input function and typioparam info */
2940  if (cstate->binary)
2941  getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
2942  &in_func_oid, &typioparams[attnum - 1]);
2943  else
2944  getTypeInputInfo(attr[attnum - 1]->atttypid,
2945  &in_func_oid, &typioparams[attnum - 1]);
2946  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
2947 
2948  /* Get default info if needed */
2949  if (!list_member_int(cstate->attnumlist, attnum))
2950  {
2951  /* attribute is NOT to be copied from input */
2952  /* use default value if one exists */
2953  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
2954  attnum);
2955 
2956  if (defexpr != NULL)
2957  {
2958  /* Run the expression through planner */
2959  defexpr = expression_planner(defexpr);
2960 
2961  /* Initialize executable expression in copycontext */
2962  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
2963  defmap[num_defaults] = attnum - 1;
2964  num_defaults++;
2965 
2966  /*
2967  * If a default expression looks at the table being loaded,
2968  * then it could give the wrong answer when using
2969  * multi-insert. Since database access can be dynamic this is
2970  * hard to test for exactly, so we use the much wider test of
2971  * whether the default expression is volatile. We allow for
2972  * the special case of when the default expression is the
2973  * nextval() of a sequence which in this specific case is
2974  * known to be safe for use with the multi-insert
2975  * optimisation. Hence we use this special case function
2976  * checker rather than the standard check for
2977  * contain_volatile_functions().
2978  */
2979  if (!volatile_defexprs)
2980  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
2981  }
2982  }
2983  }
2984 
2985  /* We keep those variables in cstate. */
2986  cstate->in_functions = in_functions;
2987  cstate->typioparams = typioparams;
2988  cstate->defmap = defmap;
2989  cstate->defexprs = defexprs;
2990  cstate->volatile_defexprs = volatile_defexprs;
2991  cstate->num_defaults = num_defaults;
2992  cstate->is_program = is_program;
2993 
2994  if (pipe)
2995  {
2996  Assert(!is_program); /* the grammar does not allow this */
2998  ReceiveCopyBegin(cstate);
2999  else
3000  cstate->copy_file = stdin;
3001  }
3002  else
3003  {
3004  cstate->filename = pstrdup(filename);
3005 
3006  if (cstate->is_program)
3007  {
3008  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3009  if (cstate->copy_file == NULL)
3010  ereport(ERROR,
3012  errmsg("could not execute command \"%s\": %m",
3013  cstate->filename)));
3014  }
3015  else
3016  {
3017  struct stat st;
3018 
3019  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3020  if (cstate->copy_file == NULL)
3021  {
3022  /* copy errno because ereport subfunctions might change it */
3023  int save_errno = errno;
3024 
3025  ereport(ERROR,
3027  errmsg("could not open file \"%s\" for reading: %m",
3028  cstate->filename),
3029  (save_errno == ENOENT || save_errno == EACCES) ?
3030  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3031  "You may want a client-side facility such as psql's \\copy.") : 0));
3032  }
3033 
3034  if (fstat(fileno(cstate->copy_file), &st))
3035  ereport(ERROR,
3037  errmsg("could not stat file \"%s\": %m",
3038  cstate->filename)));
3039 
3040  if (S_ISDIR(st.st_mode))
3041  ereport(ERROR,
3042  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3043  errmsg("\"%s\" is a directory", cstate->filename)));
3044  }
3045  }
3046 
3047  if (!cstate->binary)
3048  {
3049  /* must rely on user to tell us... */
3050  cstate->file_has_oids = cstate->oids;
3051  }
3052  else
3053  {
3054  /* Read and verify binary header */
3055  char readSig[11];
3056  int32 tmp;
3057 
3058  /* Signature */
3059  if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3060  memcmp(readSig, BinarySignature, 11) != 0)
3061  ereport(ERROR,
3062  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3063  errmsg("COPY file signature not recognized")));
3064  /* Flags field */
3065  if (!CopyGetInt32(cstate, &tmp))
3066  ereport(ERROR,
3067  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3068  errmsg("invalid COPY file header (missing flags)")));
3069  cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3070  tmp &= ~(1 << 16);
3071  if ((tmp >> 16) != 0)
3072  ereport(ERROR,
3073  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3074  errmsg("unrecognized critical flags in COPY file header")));
3075  /* Header extension length */
3076  if (!CopyGetInt32(cstate, &tmp) ||
3077  tmp < 0)
3078  ereport(ERROR,
3079  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3080  errmsg("invalid COPY file header (missing length)")));
3081  /* Skip extension header, if present */
3082  while (tmp-- > 0)
3083  {
3084  if (CopyGetData(cstate, readSig, 1, 1) != 1)
3085  ereport(ERROR,
3086  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3087  errmsg("invalid COPY file header (wrong length)")));
3088  }
3089  }
3090 
3091  if (cstate->file_has_oids && cstate->binary)
3092  {
3094  &in_func_oid, &cstate->oid_typioparam);
3095  fmgr_info(in_func_oid, &cstate->oid_in_function);
3096  }
3097 
3098  /* create workspace for CopyReadAttributes results */
3099  if (!cstate->binary)
3100  {
3101  AttrNumber attr_count = list_length(cstate->attnumlist);
3102  int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3103 
3104  cstate->max_fields = nfields;
3105  cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3106  }
3107 
3108  MemoryContextSwitchTo(oldcontext);
3109 
3110  return cstate;
3111 }
3112 
3113 /*
3114  * Read raw fields in the next line for COPY FROM in text or csv mode.
3115  * Return false if no more lines.
3116  *
3117  * An internal temporary buffer is returned via 'fields'. It is valid until
3118  * the next call of the function. Since the function returns all raw fields
3119  * in the input file, 'nfields' could be different from the number of columns
3120  * in the relation.
3121  *
3122  * NOTE: force_not_null option are not applied to the returned fields.
3123  */
3124 bool
3125 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3126 {
3127  int fldct;
3128  bool done;
3129 
3130  /* only available for text or csv input */
3131  Assert(!cstate->binary);
3132 
3133  /* on input just throw the header line away */
3134  if (cstate->cur_lineno == 0 && cstate->header_line)
3135  {
3136  cstate->cur_lineno++;
3137  if (CopyReadLine(cstate))
3138  return false; /* done */
3139  }
3140 
3141  cstate->cur_lineno++;
3142 
3143  /* Actually read the line into memory here */
3144  done = CopyReadLine(cstate);
3145 
3146  /*
3147  * EOF at start of line means we're done. If we see EOF after some
3148  * characters, we act as though it was newline followed by EOF, ie,
3149  * process the line and then exit loop on next iteration.
3150  */
3151  if (done && cstate->line_buf.len == 0)
3152  return false;
3153 
3154  /* Parse the line into de-escaped field values */
3155  if (cstate->csv_mode)
3156  fldct = CopyReadAttributesCSV(cstate);
3157  else
3158  fldct = CopyReadAttributesText(cstate);
3159 
3160  *fields = cstate->raw_fields;
3161  *nfields = fldct;
3162  return true;
3163 }
3164 
3165 /*
3166  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3167  *
3168  * 'econtext' is used to evaluate default expression for each columns not
3169  * read from the file. It can be NULL when no default values are used, i.e.
3170  * when all columns are read from the file.
3171  *
3172  * 'values' and 'nulls' arrays must be the same length as columns of the
3173  * relation passed to BeginCopyFrom. This function fills the arrays.
3174  * Oid of the tuple is returned with 'tupleOid' separately.
3175  */
3176 bool
3178  Datum *values, bool *nulls, Oid *tupleOid)
3179 {
3180  TupleDesc tupDesc;
3181  Form_pg_attribute *attr;
3182  AttrNumber num_phys_attrs,
3183  attr_count,
3184  num_defaults = cstate->num_defaults;
3185  FmgrInfo *in_functions = cstate->in_functions;
3186  Oid *typioparams = cstate->typioparams;
3187  int i;
3188  int nfields;
3189  bool isnull;
3190  bool file_has_oids = cstate->file_has_oids;
3191  int *defmap = cstate->defmap;
3192  ExprState **defexprs = cstate->defexprs;
3193 
3194  tupDesc = RelationGetDescr(cstate->rel);
3195  attr = tupDesc->attrs;
3196  num_phys_attrs = tupDesc->natts;
3197  attr_count = list_length(cstate->attnumlist);
3198  nfields = file_has_oids ? (attr_count + 1) : attr_count;
3199 
3200  /* Initialize all values for row to NULL */
3201  MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3202  MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3203 
3204  if (!cstate->binary)
3205  {
3206  char **field_strings;
3207  ListCell *cur;
3208  int fldct;
3209  int fieldno;
3210  char *string;
3211 
3212  /* read raw fields in the next line */
3213  if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3214  return false;
3215 
3216  /* check for overflowing fields */
3217  if (nfields > 0 && fldct > nfields)
3218  ereport(ERROR,
3219  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3220  errmsg("extra data after last expected column")));
3221 
3222  fieldno = 0;
3223 
3224  /* Read the OID field if present */
3225  if (file_has_oids)
3226  {
3227  if (fieldno >= fldct)
3228  ereport(ERROR,
3229  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3230  errmsg("missing data for OID column")));
3231  string = field_strings[fieldno++];
3232 
3233  if (string == NULL)
3234  ereport(ERROR,
3235  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3236  errmsg("null OID in COPY data")));
3237  else if (cstate->oids && tupleOid != NULL)
3238  {
3239  cstate->cur_attname = "oid";
3240  cstate->cur_attval = string;
3242  CStringGetDatum(string)));
3243  if (*tupleOid == InvalidOid)
3244  ereport(ERROR,
3245  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3246  errmsg("invalid OID in COPY data")));
3247  cstate->cur_attname = NULL;
3248  cstate->cur_attval = NULL;
3249  }
3250  }
3251 
3252  /* Loop to read the user attributes on the line. */
3253  foreach(cur, cstate->attnumlist)
3254  {
3255  int attnum = lfirst_int(cur);
3256  int m = attnum - 1;
3257 
3258  if (fieldno >= fldct)
3259  ereport(ERROR,
3260  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3261  errmsg("missing data for column \"%s\"",
3262  NameStr(attr[m]->attname))));
3263  string = field_strings[fieldno++];
3264 
3265  if (cstate->convert_select_flags &&
3266  !cstate->convert_select_flags[m])
3267  {
3268  /* ignore input field, leaving column as NULL */
3269  continue;
3270  }
3271 
3272  if (cstate->csv_mode)
3273  {
3274  if (string == NULL &&
3275  cstate->force_notnull_flags[m])
3276  {
3277  /*
3278  * FORCE_NOT_NULL option is set and column is NULL -
3279  * convert it to the NULL string.
3280  */
3281  string = cstate->null_print;
3282  }
3283  else if (string != NULL && cstate->force_null_flags[m]
3284  && strcmp(string, cstate->null_print) == 0)
3285  {
3286  /*
3287  * FORCE_NULL option is set and column matches the NULL
3288  * string. It must have been quoted, or otherwise the
3289  * string would already have been set to NULL. Convert it
3290  * to NULL as specified.
3291  */
3292  string = NULL;
3293  }
3294  }
3295 
3296  cstate->cur_attname = NameStr(attr[m]->attname);
3297  cstate->cur_attval = string;
3298  values[m] = InputFunctionCall(&in_functions[m],
3299  string,
3300  typioparams[m],
3301  attr[m]->atttypmod);
3302  if (string != NULL)
3303  nulls[m] = false;
3304  cstate->cur_attname = NULL;
3305  cstate->cur_attval = NULL;
3306  }
3307 
3308  Assert(fieldno == nfields);
3309  }
3310  else
3311  {
3312  /* binary */
3313  int16 fld_count;
3314  ListCell *cur;
3315 
3316  cstate->cur_lineno++;
3317 
3318  if (!CopyGetInt16(cstate, &fld_count))
3319  {
3320  /* EOF detected (end of file, or protocol-level EOF) */
3321  return false;
3322  }
3323 
3324  if (fld_count == -1)
3325  {
3326  /*
3327  * Received EOF marker. In a V3-protocol copy, wait for the
3328  * protocol-level EOF, and complain if it doesn't come
3329  * immediately. This ensures that we correctly handle CopyFail,
3330  * if client chooses to send that now.
3331  *
3332  * Note that we MUST NOT try to read more data in an old-protocol
3333  * copy, since there is no protocol-level EOF marker then. We
3334  * could go either way for copy from file, but choose to throw
3335  * error if there's data after the EOF marker, for consistency
3336  * with the new-protocol case.
3337  */
3338  char dummy;
3339 
3340  if (cstate->copy_dest != COPY_OLD_FE &&
3341  CopyGetData(cstate, &dummy, 1, 1) > 0)
3342  ereport(ERROR,
3343  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3344  errmsg("received copy data after EOF marker")));
3345  return false;
3346  }
3347 
3348  if (fld_count != attr_count)
3349  ereport(ERROR,
3350  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3351  errmsg("row field count is %d, expected %d",
3352  (int) fld_count, attr_count)));
3353 
3354  if (file_has_oids)
3355  {
3356  Oid loaded_oid;
3357 
3358  cstate->cur_attname = "oid";
3359  loaded_oid =
3361  0,
3362  &cstate->oid_in_function,
3363  cstate->oid_typioparam,
3364  -1,
3365  &isnull));
3366  if (isnull || loaded_oid == InvalidOid)
3367  ereport(ERROR,
3368  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3369  errmsg("invalid OID in COPY data")));
3370  cstate->cur_attname = NULL;
3371  if (cstate->oids && tupleOid != NULL)
3372  *tupleOid = loaded_oid;
3373  }
3374 
3375  i = 0;
3376  foreach(cur, cstate->attnumlist)
3377  {
3378  int attnum = lfirst_int(cur);
3379  int m = attnum - 1;
3380 
3381  cstate->cur_attname = NameStr(attr[m]->attname);
3382  i++;
3383  values[m] = CopyReadBinaryAttribute(cstate,
3384  i,
3385  &in_functions[m],
3386  typioparams[m],
3387  attr[m]->atttypmod,
3388  &nulls[m]);
3389  cstate->cur_attname = NULL;
3390  }
3391  }
3392 
3393  /*
3394  * Now compute and insert any defaults available for the columns not
3395  * provided by the input data. Anything not processed here or above will
3396  * remain NULL.
3397  */
3398  for (i = 0; i < num_defaults; i++)
3399  {
3400  /*
3401  * The caller must supply econtext and have switched into the
3402  * per-tuple memory context in it.
3403  */
3404  Assert(econtext != NULL);
3406 
3407  values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3408  &nulls[defmap[i]]);
3409  }
3410 
3411  return true;
3412 }
3413 
3414 /*
3415  * Clean up storage and release resources for COPY FROM.
3416  */
3417 void
3419 {
3420  /* No COPY FROM related resources except memory. */
3421 
3422  EndCopy(cstate);
3423 }
3424 
3425 /*
3426  * Read the next input line and stash it in line_buf, with conversion to
3427  * server encoding.
3428  *
3429  * Result is true if read was terminated by EOF, false if terminated
3430  * by newline. The terminating newline or EOF marker is not included
3431  * in the final value of line_buf.
3432  */
3433 static bool
3435 {
3436  bool result;
3437 
3438  resetStringInfo(&cstate->line_buf);
3439  cstate->line_buf_valid = true;
3440 
3441  /* Mark that encoding conversion hasn't occurred yet */
3442  cstate->line_buf_converted = false;
3443 
3444  /* Parse data and transfer into line_buf */
3445  result = CopyReadLineText(cstate);
3446 
3447  if (result)
3448  {
3449  /*
3450  * Reached EOF. In protocol version 3, we should ignore anything
3451  * after \. up to the protocol end of copy data. (XXX maybe better
3452  * not to treat \. as special?)
3453  */
3454  if (cstate->copy_dest == COPY_NEW_FE)
3455  {
3456  do
3457  {
3458  cstate->raw_buf_index = cstate->raw_buf_len;
3459  } while (CopyLoadRawBuf(cstate));
3460  }
3461  }
3462  else
3463  {
3464  /*
3465  * If we didn't hit EOF, then we must have transferred the EOL marker
3466  * to line_buf along with the data. Get rid of it.
3467  */
3468  switch (cstate->eol_type)
3469  {
3470  case EOL_NL:
3471  Assert(cstate->line_buf.len >= 1);
3472  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3473  cstate->line_buf.len--;
3474  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3475  break;
3476  case EOL_CR:
3477  Assert(cstate->line_buf.len >= 1);
3478  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3479  cstate->line_buf.len--;
3480  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3481  break;
3482  case EOL_CRNL:
3483  Assert(cstate->line_buf.len >= 2);
3484  Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3485  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3486  cstate->line_buf.len -= 2;
3487  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3488  break;
3489  case EOL_UNKNOWN:
3490  /* shouldn't get here */
3491  Assert(false);
3492  break;
3493  }
3494  }
3495 
3496  /* Done reading the line. Convert it to server encoding. */
3497  if (cstate->need_transcoding)
3498  {
3499  char *cvt;
3500 
3501  cvt = pg_any_to_server(cstate->line_buf.data,
3502  cstate->line_buf.len,
3503  cstate->file_encoding);
3504  if (cvt != cstate->line_buf.data)
3505  {
3506  /* transfer converted data back to line_buf */
3507  resetStringInfo(&cstate->line_buf);
3508  appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3509  pfree(cvt);
3510  }
3511  }
3512 
3513  /* Now it's safe to use the buffer in error messages */
3514  cstate->line_buf_converted = true;
3515 
3516  return result;
3517 }
3518 
3519 /*
3520  * CopyReadLineText - inner loop of CopyReadLine for text mode
3521  */
3522 static bool
3524 {
3525  char *copy_raw_buf;
3526  int raw_buf_ptr;
3527  int copy_buf_len;
3528  bool need_data = false;
3529  bool hit_eof = false;
3530  bool result = false;
3531  char mblen_str[2];
3532 
3533  /* CSV variables */
3534  bool first_char_in_line = true;
3535  bool in_quote = false,
3536  last_was_esc = false;
3537  char quotec = '\0';
3538  char escapec = '\0';
3539 
3540  if (cstate->csv_mode)
3541  {
3542  quotec = cstate->quote[0];
3543  escapec = cstate->escape[0];
3544  /* ignore special escape processing if it's the same as quotec */
3545  if (quotec == escapec)
3546  escapec = '\0';
3547  }
3548 
3549  mblen_str[1] = '\0';
3550 
3551  /*
3552  * The objective of this loop is to transfer the entire next input line
3553  * into line_buf. Hence, we only care for detecting newlines (\r and/or
3554  * \n) and the end-of-copy marker (\.).
3555  *
3556  * In CSV mode, \r and \n inside a quoted field are just part of the data
3557  * value and are put in line_buf. We keep just enough state to know if we
3558  * are currently in a quoted field or not.
3559  *
3560  * These four characters, and the CSV escape and quote characters, are
3561  * assumed the same in frontend and backend encodings.
3562  *
3563  * For speed, we try to move data from raw_buf to line_buf in chunks
3564  * rather than one character at a time. raw_buf_ptr points to the next
3565  * character to examine; any characters from raw_buf_index to raw_buf_ptr
3566  * have been determined to be part of the line, but not yet transferred to
3567  * line_buf.
3568  *
3569  * For a little extra speed within the loop, we copy raw_buf and
3570  * raw_buf_len into local variables.
3571  */
3572  copy_raw_buf = cstate->raw_buf;
3573  raw_buf_ptr = cstate->raw_buf_index;
3574  copy_buf_len = cstate->raw_buf_len;
3575 
3576  for (;;)
3577  {
3578  int prev_raw_ptr;
3579  char c;
3580 
3581  /*
3582  * Load more data if needed. Ideally we would just force four bytes
3583  * of read-ahead and avoid the many calls to
3584  * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3585  * does not allow us to read too far ahead or we might read into the
3586  * next data, so we read-ahead only as far we know we can. One
3587  * optimization would be to read-ahead four byte here if
3588  * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3589  * considering the size of the buffer.
3590  */
3591  if (raw_buf_ptr >= copy_buf_len || need_data)
3592  {
3594 
3595  /*
3596  * Try to read some more data. This will certainly reset
3597  * raw_buf_index to zero, and raw_buf_ptr must go with it.
3598  */
3599  if (!CopyLoadRawBuf(cstate))
3600  hit_eof = true;
3601  raw_buf_ptr = 0;
3602  copy_buf_len = cstate->raw_buf_len;
3603 
3604  /*
3605  * If we are completely out of data, break out of the loop,
3606  * reporting EOF.
3607  */
3608  if (copy_buf_len <= 0)
3609  {
3610  result = true;
3611  break;
3612  }
3613  need_data = false;
3614  }
3615 
3616  /* OK to fetch a character */
3617  prev_raw_ptr = raw_buf_ptr;
3618  c = copy_raw_buf[raw_buf_ptr++];
3619 
3620  if (cstate->csv_mode)
3621  {
3622  /*
3623  * If character is '\\' or '\r', we may need to look ahead below.
3624  * Force fetch of the next character if we don't already have it.
3625  * We need to do this before changing CSV state, in case one of
3626  * these characters is also the quote or escape character.
3627  *
3628  * Note: old-protocol does not like forced prefetch, but it's OK
3629  * here since we cannot validly be at EOF.
3630  */
3631  if (c == '\\' || c == '\r')
3632  {
3634  }
3635 
3636  /*
3637  * Dealing with quotes and escapes here is mildly tricky. If the
3638  * quote char is also the escape char, there's no problem - we
3639  * just use the char as a toggle. If they are different, we need
3640  * to ensure that we only take account of an escape inside a
3641  * quoted field and immediately preceding a quote char, and not
3642  * the second in an escape-escape sequence.
3643  */
3644  if (in_quote && c == escapec)
3645  last_was_esc = !last_was_esc;
3646  if (c == quotec && !last_was_esc)
3647  in_quote = !in_quote;
3648  if (c != escapec)
3649  last_was_esc = false;
3650 
3651  /*
3652  * Updating the line count for embedded CR and/or LF chars is
3653  * necessarily a little fragile - this test is probably about the
3654  * best we can do. (XXX it's arguable whether we should do this
3655  * at all --- is cur_lineno a physical or logical count?)
3656  */
3657  if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3658  cstate->cur_lineno++;
3659  }
3660 
3661  /* Process \r */
3662  if (c == '\r' && (!cstate->csv_mode || !in_quote))
3663  {
3664  /* Check for \r\n on first line, _and_ handle \r\n. */
3665  if (cstate->eol_type == EOL_UNKNOWN ||
3666  cstate->eol_type == EOL_CRNL)
3667  {
3668  /*
3669  * If need more data, go back to loop top to load it.
3670  *
3671  * Note that if we are at EOF, c will wind up as '\0' because
3672  * of the guaranteed pad of raw_buf.
3673  */
3675 
3676  /* get next char */
3677  c = copy_raw_buf[raw_buf_ptr];
3678 
3679  if (c == '\n')
3680  {
3681  raw_buf_ptr++; /* eat newline */
3682  cstate->eol_type = EOL_CRNL; /* in case not set yet */
3683  }
3684  else
3685  {
3686  /* found \r, but no \n */
3687  if (cstate->eol_type == EOL_CRNL)
3688  ereport(ERROR,
3689  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3690  !cstate->csv_mode ?
3691  errmsg("literal carriage return found in data") :
3692  errmsg("unquoted carriage return found in data"),
3693  !cstate->csv_mode ?
3694  errhint("Use \"\\r\" to represent carriage return.") :
3695  errhint("Use quoted CSV field to represent carriage return.")));
3696 
3697  /*
3698  * if we got here, it is the first line and we didn't find
3699  * \n, so don't consume the peeked character
3700  */
3701  cstate->eol_type = EOL_CR;
3702  }
3703  }
3704  else if (cstate->eol_type == EOL_NL)
3705  ereport(ERROR,
3706  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3707  !cstate->csv_mode ?
3708  errmsg("literal carriage return found in data") :
3709  errmsg("unquoted carriage return found in data"),
3710  !cstate->csv_mode ?
3711  errhint("Use \"\\r\" to represent carriage return.") :
3712  errhint("Use quoted CSV field to represent carriage return.")));
3713  /* If reach here, we have found the line terminator */
3714  break;
3715  }
3716 
3717  /* Process \n */
3718  if (c == '\n' && (!cstate->csv_mode || !in_quote))
3719  {
3720  if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3721  ereport(ERROR,
3722  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3723  !cstate->csv_mode ?
3724  errmsg("literal newline found in data") :
3725  errmsg("unquoted newline found in data"),
3726  !cstate->csv_mode ?
3727  errhint("Use \"\\n\" to represent newline.") :
3728  errhint("Use quoted CSV field to represent newline.")));
3729  cstate->eol_type = EOL_NL; /* in case not set yet */
3730  /* If reach here, we have found the line terminator */
3731  break;
3732  }
3733 
3734  /*
3735  * In CSV mode, we only recognize \. alone on a line. This is because
3736  * \. is a valid CSV data value.
3737  */
3738  if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3739  {
3740  char c2;
3741 
3744 
3745  /* -----
3746  * get next character
3747  * Note: we do not change c so if it isn't \., we can fall
3748  * through and continue processing for file encoding.
3749  * -----
3750  */
3751  c2 = copy_raw_buf[raw_buf_ptr];
3752 
3753  if (c2 == '.')
3754  {
3755  raw_buf_ptr++; /* consume the '.' */
3756 
3757  /*
3758  * Note: if we loop back for more data here, it does not
3759  * matter that the CSV state change checks are re-executed; we
3760  * will come back here with no important state changed.
3761  */
3762  if (cstate->eol_type == EOL_CRNL)
3763  {
3764  /* Get the next character */
3766  /* if hit_eof, c2 will become '\0' */
3767  c2 = copy_raw_buf[raw_buf_ptr++];
3768 
3769  if (c2 == '\n')
3770  {
3771  if (!cstate->csv_mode)
3772  ereport(ERROR,
3773  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3774  errmsg("end-of-copy marker does not match previous newline style")));
3775  else
3777  }
3778  else if (c2 != '\r')
3779  {
3780  if (!cstate->csv_mode)
3781  ereport(ERROR,
3782  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3783  errmsg("end-of-copy marker corrupt")));
3784  else
3786  }
3787  }
3788 
3789  /* Get the next character */
3791  /* if hit_eof, c2 will become '\0' */
3792  c2 = copy_raw_buf[raw_buf_ptr++];
3793 
3794  if (c2 != '\r' && c2 != '\n')
3795  {
3796  if (!cstate->csv_mode)
3797  ereport(ERROR,
3798  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3799  errmsg("end-of-copy marker corrupt")));
3800  else
3802  }
3803 
3804  if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3805  (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3806  (cstate->eol_type == EOL_CR && c2 != '\r'))
3807  {
3808  ereport(ERROR,
3809  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3810  errmsg("end-of-copy marker does not match previous newline style")));
3811  }
3812 
3813  /*
3814  * Transfer only the data before the \. into line_buf, then
3815  * discard the data and the \. sequence.
3816  */
3817  if (prev_raw_ptr > cstate->raw_buf_index)
3819  cstate->raw_buf + cstate->raw_buf_index,
3820  prev_raw_ptr - cstate->raw_buf_index);
3821  cstate->raw_buf_index = raw_buf_ptr;
3822  result = true; /* report EOF */
3823  break;
3824  }
3825  else if (!cstate->csv_mode)
3826 
3827  /*
3828  * If we are here, it means we found a backslash followed by
3829  * something other than a period. In non-CSV mode, anything
3830  * after a backslash is special, so we skip over that second
3831  * character too. If we didn't do that \\. would be
3832  * considered an eof-of copy, while in non-CSV mode it is a
3833  * literal backslash followed by a period. In CSV mode,
3834  * backslashes are not special, so we want to process the
3835  * character after the backslash just like a normal character,
3836  * so we don't increment in those cases.
3837  */
3838  raw_buf_ptr++;
3839  }
3840 
3841  /*
3842  * This label is for CSV cases where \. appears at the start of a
3843  * line, but there is more text after it, meaning it was a data value.
3844  * We are more strict for \. in CSV mode because \. could be a data
3845  * value, while in non-CSV mode, \. cannot be a data value.
3846  */
3847 not_end_of_copy:
3848 
3849  /*
3850  * Process all bytes of a multi-byte character as a group.
3851  *
3852  * We only support multi-byte sequences where the first byte has the
3853  * high-bit set, so as an optimization we can avoid this block
3854  * entirely if it is not set.
3855  */
3856  if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3857  {
3858  int mblen;
3859 
3860  mblen_str[0] = c;
3861  /* All our encodings only read the first byte to get the length */
3862  mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3864  IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3865  raw_buf_ptr += mblen - 1;
3866  }
3867  first_char_in_line = false;
3868  } /* end of outer loop */
3869 
3870  /*
3871  * Transfer any still-uncopied data to line_buf.
3872  */
3874 
3875  return result;
3876 }
3877 
3878 /*
3879  * Return decimal value for a hexadecimal digit
3880  */
3881 static int
3883 {
3884  if (isdigit((unsigned char) hex))
3885  return hex - '0';
3886  else
3887  return tolower((unsigned char) hex) - 'a' + 10;
3888 }
3889 
3890 /*
3891  * Parse the current line into separate attributes (fields),
3892  * performing de-escaping as needed.
3893  *
3894  * The input is in line_buf. We use attribute_buf to hold the result
3895  * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3896  * string, or NULL when the input matches the null marker string.
3897  * This array is expanded as necessary.
3898  *
3899  * (Note that the caller cannot check for nulls since the returned
3900  * string would be the post-de-escaping equivalent, which may look
3901  * the same as some valid data string.)
3902  *
3903  * delim is the column delimiter string (must be just one byte for now).
3904  * null_print is the null marker string. Note that this is compared to
3905  * the pre-de-escaped input string.
3906  *
3907  * The return value is the number of fields actually read.
3908  */
3909 static int
3911 {
3912  char delimc = cstate->delim[0];
3913  int fieldno;
3914  char *output_ptr;
3915  char *cur_ptr;
3916  char *line_end_ptr;
3917 
3918  /*
3919  * We need a special case for zero-column tables: check that the input
3920  * line is empty, and return.
3921  */
3922  if (cstate->max_fields <= 0)
3923  {
3924  if (cstate->line_buf.len != 0)
3925  ereport(ERROR,
3926  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3927  errmsg("extra data after last expected column")));
3928  return 0;
3929  }
3930 
3931  resetStringInfo(&cstate->attribute_buf);
3932 
3933  /*
3934  * The de-escaped attributes will certainly not be longer than the input
3935  * data line, so we can just force attribute_buf to be large enough and
3936  * then transfer data without any checks for enough space. We need to do
3937  * it this way because enlarging attribute_buf mid-stream would invalidate
3938  * pointers already stored into cstate->raw_fields[].
3939  */
3940  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
3941  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
3942  output_ptr = cstate->attribute_buf.data;
3943 
3944  /* set pointer variables for loop */
3945  cur_ptr = cstate->line_buf.data;
3946  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
3947 
3948  /* Outer loop iterates over fields */
3949  fieldno = 0;
3950  for (;;)
3951  {
3952  bool found_delim = false;
3953  char *start_ptr;
3954  char *end_ptr;
3955  int input_len;
3956  bool saw_non_ascii = false;
3957 
3958  /* Make sure there is enough space for the next value */
3959  if (fieldno >= cstate->max_fields)
3960  {
3961  cstate->max_fields *= 2;
3962  cstate->raw_fields =
3963  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
3964  }
3965 
3966  /* Remember start of field on both input and output sides */
3967  start_ptr = cur_ptr;
3968  cstate->raw_fields[fieldno] = output_ptr;
3969 
3970  /*
3971  * Scan data for field.
3972  *
3973  * Note that in this loop, we are scanning to locate the end of field
3974  * and also speculatively performing de-escaping. Once we find the
3975  * end-of-field, we can match the raw field contents against the null
3976  * marker string. Only after that comparison fails do we know that
3977  * de-escaping is actually the right thing to do; therefore we *must
3978  * not* throw any syntax errors before we've done the null-marker
3979  * check.
3980  */
3981  for (;;)
3982  {
3983  char c;
3984 
3985  end_ptr = cur_ptr;
3986  if (cur_ptr >= line_end_ptr)
3987  break;
3988  c = *cur_ptr++;
3989  if (c == delimc)
3990  {
3991  found_delim = true;
3992  break;
3993  }
3994  if (c == '\\')
3995  {
3996  if (cur_ptr >= line_end_ptr)
3997  break;
3998  c = *cur_ptr++;
3999  switch (c)
4000  {
4001  case '0':
4002  case '1':
4003  case '2':
4004  case '3':
4005  case '4':
4006  case '5':
4007  case '6':
4008  case '7':
4009  {
4010  /* handle \013 */
4011  int val;
4012 
4013  val = OCTVALUE(c);
4014  if (cur_ptr < line_end_ptr)
4015  {
4016  c = *cur_ptr;
4017  if (ISOCTAL(c))
4018  {
4019  cur_ptr++;
4020  val = (val << 3) + OCTVALUE(c);
4021  if (cur_ptr < line_end_ptr)
4022  {
4023  c = *cur_ptr;
4024  if (ISOCTAL(c))
4025  {
4026  cur_ptr++;
4027  val = (val << 3) + OCTVALUE(c);
4028  }
4029  }
4030  }
4031  }
4032  c = val & 0377;
4033  if (c == '\0' || IS_HIGHBIT_SET(c))
4034  saw_non_ascii = true;
4035  }
4036  break;
4037  case 'x':
4038  /* Handle \x3F */
4039  if (cur_ptr < line_end_ptr)
4040  {
4041  char hexchar = *cur_ptr;
4042 
4043  if (isxdigit((unsigned char) hexchar))
4044  {
4045  int val = GetDecimalFromHex(hexchar);
4046 
4047  cur_ptr++;
4048  if (cur_ptr < line_end_ptr)
4049  {
4050  hexchar = *cur_ptr;
4051  if (isxdigit((unsigned char) hexchar))
4052  {
4053  cur_ptr++;
4054  val = (val << 4) + GetDecimalFromHex(hexchar);
4055  }
4056  }
4057  c = val & 0xff;
4058  if (c == '\0' || IS_HIGHBIT_SET(c))
4059  saw_non_ascii = true;
4060  }
4061  }
4062  break;
4063  case 'b':
4064  c = '\b';
4065  break;
4066  case 'f':
4067  c = '\f';
4068  break;
4069  case 'n':
4070  c = '\n';
4071  break;
4072  case 'r':
4073  c = '\r';
4074  break;
4075  case 't':
4076  c = '\t';
4077  break;
4078  case 'v':
4079  c = '\v';
4080  break;
4081 
4082  /*
4083  * in all other cases, take the char after '\'
4084  * literally
4085  */
4086  }
4087  }
4088 
4089  /* Add c to output string */
4090  *output_ptr++ = c;
4091  }
4092 
4093  /* Check whether raw input matched null marker */
4094  input_len = end_ptr - start_ptr;
4095  if (input_len == cstate->null_print_len &&
4096  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4097  cstate->raw_fields[fieldno] = NULL;
4098  else
4099  {
4100  /*
4101  * At this point we know the field is supposed to contain data.
4102  *
4103  * If we de-escaped any non-7-bit-ASCII chars, make sure the
4104  * resulting string is valid data for the db encoding.
4105  */
4106  if (saw_non_ascii)
4107  {
4108  char *fld = cstate->raw_fields[fieldno];
4109 
4110  pg_verifymbstr(fld, output_ptr - fld, false);
4111  }
4112  }
4113 
4114  /* Terminate attribute value in output area */
4115  *output_ptr++ = '\0';
4116 
4117  fieldno++;
4118  /* Done if we hit EOL instead of a delim */
4119  if (!found_delim)
4120  break;
4121  }
4122 
4123  /* Clean up state of attribute_buf */
4124  output_ptr--;
4125  Assert(*output_ptr == '\0');
4126  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4127 
4128  return fieldno;
4129 }
4130 
4131 /*
4132  * Parse the current line into separate attributes (fields),
4133  * performing de-escaping as needed. This has exactly the same API as
4134  * CopyReadAttributesText, except we parse the fields according to
4135  * "standard" (i.e. common) CSV usage.
4136  */
4137 static int
4139 {
4140  char delimc = cstate->delim[0];
4141  char quotec = cstate->quote[0];
4142  char escapec = cstate->escape[0];
4143  int fieldno;
4144  char *output_ptr;
4145  char *cur_ptr;
4146  char *line_end_ptr;
4147 
4148  /*
4149  * We need a special case for zero-column tables: check that the input
4150  * line is empty, and return.
4151  */
4152  if (cstate->max_fields <= 0)
4153  {
4154  if (cstate->line_buf.len != 0)
4155  ereport(ERROR,
4156  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4157  errmsg("extra data after last expected column")));
4158  return 0;
4159  }
4160 
4161  resetStringInfo(&cstate->attribute_buf);
4162 
4163  /*
4164  * The de-escaped attributes will certainly not be longer than the input
4165  * data line, so we can just force attribute_buf to be large enough and
4166  * then transfer data without any checks for enough space. We need to do
4167  * it this way because enlarging attribute_buf mid-stream would invalidate
4168  * pointers already stored into cstate->raw_fields[].
4169  */
4170  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4171  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4172  output_ptr = cstate->attribute_buf.data;
4173 
4174  /* set pointer variables for loop */
4175  cur_ptr = cstate->line_buf.data;
4176  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4177 
4178  /* Outer loop iterates over fields */
4179  fieldno = 0;
4180  for (;;)
4181  {
4182  bool found_delim = false;
4183  bool saw_quote = false;
4184  char *start_ptr;
4185  char *end_ptr;
4186  int input_len;
4187 
4188  /* Make sure there is enough space for the next value */
4189  if (fieldno >= cstate->max_fields)
4190  {
4191  cstate->max_fields *= 2;
4192  cstate->raw_fields =
4193  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4194  }
4195 
4196  /* Remember start of field on both input and output sides */
4197  start_ptr = cur_ptr;
4198  cstate->raw_fields[fieldno] = output_ptr;
4199 
4200  /*
4201  * Scan data for field,
4202  *
4203  * The loop starts in "not quote" mode and then toggles between that
4204  * and "in quote" mode. The loop exits normally if it is in "not
4205  * quote" mode and a delimiter or line end is seen.
4206  */
4207  for (;;)
4208  {
4209  char c;
4210 
4211  /* Not in quote */
4212  for (;;)
4213  {
4214  end_ptr = cur_ptr;
4215  if (cur_ptr >= line_end_ptr)
4216  goto endfield;
4217  c = *cur_ptr++;
4218  /* unquoted field delimiter */
4219  if (c == delimc)
4220  {
4221  found_delim = true;
4222  goto endfield;
4223  }
4224  /* start of quoted field (or part of field) */
4225  if (c == quotec)
4226  {
4227  saw_quote = true;
4228  break;
4229  }
4230  /* Add c to output string */
4231  *output_ptr++ = c;
4232  }
4233 
4234  /* In quote */
4235  for (;;)
4236  {
4237  end_ptr = cur_ptr;
4238  if (cur_ptr >= line_end_ptr)
4239  ereport(ERROR,
4240  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4241  errmsg("unterminated CSV quoted field")));
4242 
4243  c = *cur_ptr++;
4244 
4245  /* escape within a quoted field */
4246  if (c == escapec)
4247  {
4248  /*
4249  * peek at the next char if available, and escape it if it
4250  * is an escape char or a quote char
4251  */
4252  if (cur_ptr < line_end_ptr)
4253  {
4254  char nextc = *cur_ptr;
4255 
4256  if (nextc == escapec || nextc == quotec)
4257  {
4258  *output_ptr++ = nextc;
4259  cur_ptr++;
4260  continue;
4261  }
4262  }
4263  }
4264 
4265  /*
4266  * end of quoted field. Must do this test after testing for
4267  * escape in case quote char and escape char are the same
4268  * (which is the common case).
4269  */
4270  if (c == quotec)
4271  break;
4272 
4273  /* Add c to output string */
4274  *output_ptr++ = c;
4275  }
4276  }
4277 endfield:
4278 
4279  /* Terminate attribute value in output area */
4280  *output_ptr++ = '\0';
4281 
4282  /* Check whether raw input matched null marker */
4283  input_len = end_ptr - start_ptr;
4284  if (!saw_quote && input_len == cstate->null_print_len &&
4285  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4286  cstate->raw_fields[fieldno] = NULL;
4287 
4288  fieldno++;
4289  /* Done if we hit EOL instead of a delim */
4290  if (!found_delim)
4291  break;
4292  }
4293 
4294  /* Clean up state of attribute_buf */
4295  output_ptr--;
4296  Assert(*output_ptr == '\0');
4297  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4298 
4299  return fieldno;
4300 }
4301 
4302 
4303 /*
4304  * Read a binary attribute
4305  */
4306 static Datum
4308  int column_no, FmgrInfo *flinfo,
4309  Oid typioparam, int32 typmod,
4310  bool *isnull)
4311 {
4312  int32 fld_size;
4313  Datum result;
4314 
4315  if (!CopyGetInt32(cstate, &fld_size))
4316  ereport(ERROR,
4317  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4318  errmsg("unexpected EOF in COPY data")));
4319  if (fld_size == -1)
4320  {
4321  *isnull = true;
4322  return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4323  }
4324  if (fld_size < 0)
4325  ereport(ERROR,
4326  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4327  errmsg("invalid field size")));
4328 
4329  /* reset attribute_buf to empty, and load raw data in it */
4330  resetStringInfo(&cstate->attribute_buf);
4331 
4332  enlargeStringInfo(&cstate->attribute_buf, fld_size);
4333  if (CopyGetData(cstate, cstate->attribute_buf.data,
4334  fld_size, fld_size) != fld_size)
4335  ereport(ERROR,
4336  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4337  errmsg("unexpected EOF in COPY data")));
4338 
4339  cstate->attribute_buf.len = fld_size;
4340  cstate->attribute_buf.data[fld_size] = '\0';
4341 
4342  /* Call the column type's binary input converter */
4343  result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4344  typioparam, typmod);
4345 
4346  /* Trouble if it didn't eat the whole buffer */
4347  if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4348  ereport(ERROR,
4349  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4350  errmsg("incorrect binary data format")));
4351 
4352  *isnull = false;
4353  return result;
4354 }
4355 
4356 /*
4357  * Send text representation of one attribute, with conversion and escaping
4358  */
4359 #define DUMPSOFAR() \
4360  do { \
4361  if (ptr > start) \
4362  CopySendData(cstate, start, ptr - start); \
4363  } while (0)
4364 
4365 static void
4366 CopyAttributeOutText(CopyState cstate, char *string)
4367 {
4368  char *ptr;
4369  char *start;
4370  char c;
4371  char delimc = cstate->delim[0];
4372 
4373  if (cstate->need_transcoding)
4374  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4375  else
4376  ptr = string;
4377 
4378  /*
4379  * We have to grovel through the string searching for control characters
4380  * and instances of the delimiter character. In most cases, though, these
4381  * are infrequent. To avoid overhead from calling CopySendData once per
4382  * character, we dump out all characters between escaped characters in a
4383  * single call. The loop invariant is that the data from "start" to "ptr"
4384  * can be sent literally, but hasn't yet been.
4385  *
4386  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4387  * in valid backend encodings, extra bytes of a multibyte character never
4388  * look like ASCII. This loop is sufficiently performance-critical that
4389  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4390  * of the normal safe-encoding path.
4391  */
4392  if (cstate->encoding_embeds_ascii)
4393  {
4394  start = ptr;
4395  while ((c = *ptr) != '\0')
4396  {
4397  if ((unsigned char) c < (unsigned char) 0x20)
4398  {
4399  /*
4400  * \r and \n must be escaped, the others are traditional. We
4401  * prefer to dump these using the C-like notation, rather than
4402  * a backslash and the literal character, because it makes the
4403  * dump file a bit more proof against Microsoftish data
4404  * mangling.
4405  */
4406  switch (c)
4407  {
4408  case '\b':
4409  c = 'b';
4410  break;
4411  case '\f':
4412  c = 'f';
4413  break;
4414  case '\n':
4415  c = 'n';
4416  break;
4417  case '\r':
4418  c = 'r';
4419  break;
4420  case '\t':
4421  c = 't';
4422  break;
4423  case '\v':
4424  c = 'v';
4425  break;
4426  default:
4427  /* If it's the delimiter, must backslash it */
4428  if (c == delimc)
4429  break;
4430  /* All ASCII control chars are length 1 */
4431  ptr++;
4432  continue; /* fall to end of loop */
4433  }
4434  /* if we get here, we need to convert the control char */
4435  DUMPSOFAR();
4436  CopySendChar(cstate, '\\');
4437  CopySendChar(cstate, c);
4438  start = ++ptr; /* do not include char in next run */
4439  }
4440  else if (c == '\\' || c == delimc)
4441  {
4442  DUMPSOFAR();
4443  CopySendChar(cstate, '\\');
4444  start = ptr++; /* we include char in next run */
4445  }
4446  else if (IS_HIGHBIT_SET(c))
4447  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4448  else
4449  ptr++;
4450  }
4451  }
4452  else
4453  {
4454  start = ptr;
4455  while ((c = *ptr) != '\0')
4456  {
4457  if ((unsigned char) c < (unsigned char) 0x20)
4458  {
4459  /*
4460  * \r and \n must be escaped, the others are traditional. We
4461  * prefer to dump these using the C-like notation, rather than
4462  * a backslash and the literal character, because it makes the
4463  * dump file a bit more proof against Microsoftish data
4464  * mangling.
4465  */
4466  switch (c)
4467  {
4468  case '\b':
4469  c = 'b';
4470  break;
4471  case '\f':
4472  c = 'f';
4473  break;
4474  case '\n':
4475  c = 'n';
4476  break;
4477  case '\r':
4478  c = 'r';
4479  break;
4480  case '\t':
4481  c = 't';
4482  break;
4483  case '\v':
4484  c = 'v';
4485  break;
4486  default:
4487  /* If it's the delimiter, must backslash it */
4488  if (c == delimc)
4489  break;
4490  /* All ASCII control chars are length 1 */
4491  ptr++;
4492  continue; /* fall to end of loop */
4493  }
4494  /* if we get here, we need to convert the control char */
4495  DUMPSOFAR();
4496  CopySendChar(cstate, '\\');
4497  CopySendChar(cstate, c);
4498  start = ++ptr; /* do not include char in next run */
4499  }
4500  else if (c == '\\' || c == delimc)
4501  {
4502  DUMPSOFAR();
4503  CopySendChar(cstate, '\\');
4504  start = ptr++; /* we include char in next run */
4505  }
4506  else
4507  ptr++;
4508  }
4509  }
4510 
4511  DUMPSOFAR();
4512 }
4513 
4514 /*
4515  * Send text representation of one attribute, with conversion and
4516  * CSV-style escaping
4517  */
4518 static void
4519 CopyAttributeOutCSV(CopyState cstate, char *string,
4520  bool use_quote, bool single_attr)
4521 {
4522  char *ptr;
4523  char *start;
4524  char c;
4525  char delimc = cstate->delim[0];
4526  char quotec = cstate->quote[0];
4527  char escapec = cstate->escape[0];
4528 
4529  /* force quoting if it matches null_print (before conversion!) */
4530  if (!use_quote && strcmp(string, cstate->null_print) == 0)
4531  use_quote = true;
4532 
4533  if (cstate->need_transcoding)
4534  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4535  else
4536  ptr = string;
4537 
4538  /*
4539  * Make a preliminary pass to discover if it needs quoting
4540  */
4541  if (!use_quote)
4542  {
4543  /*
4544  * Because '\.' can be a data value, quote it if it appears alone on a
4545  * line so it is not interpreted as the end-of-data marker.
4546  */
4547  if (single_attr && strcmp(ptr, "\\.") == 0)
4548  use_quote = true;
4549  else
4550  {
4551  char *tptr = ptr;
4552 
4553  while ((c = *tptr) != '\0')
4554  {
4555  if (c == delimc || c == quotec || c == '\n' || c == '\r')
4556  {
4557  use_quote = true;
4558  break;
4559  }
4560  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4561  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4562  else
4563  tptr++;
4564  }
4565  }
4566  }
4567 
4568  if (use_quote)
4569  {
4570  CopySendChar(cstate, quotec);
4571 
4572  /*
4573  * We adopt the same optimization strategy as in CopyAttributeOutText
4574  */
4575  start = ptr;
4576  while ((c = *ptr) != '\0')
4577  {
4578  if (c == quotec || c == escapec)
4579  {
4580  DUMPSOFAR();
4581  CopySendChar(cstate, escapec);
4582  start = ptr; /* we include char in next run */
4583  }
4584  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4585  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4586  else
4587  ptr++;
4588  }
4589  DUMPSOFAR();
4590 
4591  CopySendChar(cstate, quotec);
4592  }
4593  else
4594  {
4595  /* If it doesn't need quoting, we can just dump it as-is */
4596  CopySendString(cstate, ptr);
4597  }
4598 }
4599 
4600 /*
4601  * CopyGetAttnums - build an integer list of attnums to be copied
4602  *
4603  * The input attnamelist is either the user-specified column list,
4604  * or NIL if there was none (in which case we want all the non-dropped
4605  * columns).
4606  *
4607  * rel can be NULL ... it's only used for error reports.
4608  */
4609 static List *
4610 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4611 {
4612  List *attnums = NIL;
4613 
4614  if (attnamelist == NIL)
4615  {
4616  /* Generate default column list */
4617  Form_pg_attribute *attr = tupDesc->attrs;
4618  int attr_count = tupDesc->natts;
4619  int i;
4620 
4621  for (i = 0; i < attr_count; i++)
4622  {
4623  if (attr[i]->attisdropped)
4624  continue;
4625  attnums = lappend_int(attnums, i + 1);
4626  }
4627  }
4628  else
4629  {
4630  /* Validate the user-supplied list and extract attnums */
4631  ListCell *l;
4632 
4633  foreach(l, attnamelist)
4634  {
4635  char *name = strVal(lfirst(l));
4636  int attnum;
4637  int i;
4638 
4639  /* Lookup column name */
4640  attnum = InvalidAttrNumber;
4641  for (i = 0; i < tupDesc->natts; i++)
4642  {
4643  if (tupDesc->attrs[i]->attisdropped)
4644  continue;
4645  if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
4646  {
4647  attnum = tupDesc->attrs[i]->attnum;
4648  break;
4649  }
4650  }
4651  if (attnum == InvalidAttrNumber)
4652  {
4653  if (rel != NULL)
4654  ereport(ERROR,
4655  (errcode(ERRCODE_UNDEFINED_COLUMN),
4656  errmsg("column \"%s\" of relation \"%s\" does not exist",
4657  name, RelationGetRelationName(rel))));
4658  else
4659  ereport(ERROR,
4660  (errcode(ERRCODE_UNDEFINED_COLUMN),
4661  errmsg("column \"%s\" does not exist",
4662  name)));
4663  }
4664  /* Check for duplicates */
4665  if (list_member_int(attnums, attnum))
4666  ereport(ERROR,
4667  (errcode(ERRCODE_DUPLICATE_COLUMN),
4668  errmsg("column \"%s\" specified more than once",
4669  name)));
4670  attnums = lappend_int(attnums, attnum);
4671  }
4672  }
4673 
4674  return attnums;
4675 }
4676 
4677 
4678 /*
4679  * copy_dest_startup --- executor startup
4680  */
4681 static void
4682 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4683 {
4684  /* no-op */
4685 }
4686 
4687 /*
4688  * copy_dest_receive --- receive one tuple
4689  */
4690 static bool
4692 {
4693  DR_copy *myState = (DR_copy *) self;
4694  CopyState cstate = myState->cstate;
4695 
4696  /* Make sure the tuple is fully deconstructed */
4697  slot_getallattrs(slot);
4698 
4699  /* And send the data */
4700  CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4701  myState->processed++;
4702 
4703  return true;
4704 }
4705 
4706 /*
4707  * copy_dest_shutdown --- executor end
4708  */
4709 static void
4711 {
4712  /* no-op */
4713 }
4714 
4715 /*
4716  * copy_dest_destroy --- release DestReceiver object
4717  */
4718 static void
4720 {
4721  pfree(self);
4722 }
4723 
4724 /*
4725  * CreateCopyDestReceiver -- create a suitable DestReceiver object
4726  */
4727 DestReceiver *
4729 {
4730  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4731 
4732  self->pub.receiveSlot = copy_dest_receive;
4733  self->pub.rStartup = copy_dest_startup;
4734  self->pub.rShutdown = copy_dest_shutdown;
4735  self->pub.rDestroy = copy_dest_destroy;
4736  self->pub.mydest = DestCopyOut;
4737 
4738  self->cstate = NULL; /* will be set later */
4739  self->processed = 0;
4740 
4741  return (DestReceiver *) self;
4742 }
signed short int16
Definition: c.h:252
List * indirection
Definition: parsenodes.h:432
bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
Definition: copy.c:3125
int ri_NumIndices
Definition: execnodes.h:336
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:408
static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
Definition: copy.c:4307
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:320
static int GetDecimalFromHex(char hex)
Definition: copy.c:3882
Definition: fmgr.h:53
#define MAX_COPY_DATA_DISPLAY
bool csv_mode
Definition: copy.c:115
static struct @76 value
static void SendCopyEnd(CopyState cstate)
Definition: copy.c:414
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1221
Relation ri_RelationDesc
Definition: execnodes.h:335
List * range_table
Definition: copy.c:163
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options)
Definition: copy.c:1370
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:776
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
static bool CopyReadLineText(CopyState cstate)
Definition: copy.c:3523
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:122
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate)
Definition: execTuples.c:852
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:994
Node * val
Definition: parsenodes.h:433
static bool CopyGetInt32(CopyState cstate, int32 *val)
Definition: copy.c:674
int errhint(const char *fmt,...)
Definition: elog.c:987
int pg_char_to_encoding(const char *name)
Definition: encnames.c:475
char ** raw_fields
Definition: copy.c:185
Definition: copy.c:72
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2600
#define VARDATA(PTR)
Definition: postgres.h:305
static void EndCopy(CopyState cstate)
Definition: copy.c:1731
bool binary
Definition: copy.c:112
#define pq_flush()
Definition: libpq.h:39
void CopyFromErrorCallback(void *arg)
Definition: copy.c:2184
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1581
void PreventCommandIfParallelMode(const char *cmdname)
Definition: utility.c:253
List * ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:271
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
List * attlist
Definition: parsenodes.h:1836
List * fromClause
Definition: parsenodes.h:1426
#define ISOCTAL(c)
Definition: copy.c:52
#define OCTVALUE(c)
Definition: copy.c:53
#define ResetPerTupleExprContext(estate)
Definition: executor.h:347
#define RelationGetDescr(relation)
Definition: rel.h:425
#define HEAP_INSERT_FROZEN
Definition: heapam.h:30
char * name
Definition: parsenodes.h:431
bool need_transcoding
Definition: copy.c:103
ResultRelInfo * partitions
Definition: copy.c:168
#define castNode(_type_, nodeptr)
Definition: nodes.h:577
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:100
FmgrInfo * in_functions
Definition: copy.c:158
AttrNumber num_defaults
Definition: copy.c:154
List * attnumlist
Definition: copy.c:109
#define OIDOID
Definition: pg_type.h:328
#define VARSIZE(PTR)
Definition: postgres.h:306
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:621
char * filename
Definition: copy.c:110
#define VARHDRSZ
Definition: c.h:441
bool file_has_oids
Definition: copy.c:155
#define DatumGetObjectId(X)
Definition: postgres.h:508
List * relationOids
Definition: plannodes.h:74
char * pstrdup(const char *in)
Definition: mcxt.c:1165
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:134
static void ReceiveCopyBegin(CopyState cstate)
Definition: copy.c:378
void initLongStringInfo(StringInfo str)
Definition: stringinfo.c:81
#define XLogIsNeeded()
Definition: xlog.h:145
Definition: copy.c:212
#define MAX_BUFFERED_TUPLES
bool rd_islocaltemp
Definition: rel.h:90
TupleTableSlot * ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2251
Expr * expression_planner(Expr *expr)
Definition: planner.c:5211
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:138
Form_pg_attribute * attrs
Definition: tupdesc.h:74
DestReceiver pub
Definition: copy.c:214
void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options)
Definition: copy.c:1012
#define RELKIND_MATVIEW
Definition: pg_class.h:167
StringInfoData line_buf
Definition: copy.c:194
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen)
Definition: copy.c:248
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:834
#define AccessShareLock
Definition: lockdefs.h:36
#define InvalidBuffer
Definition: buf.h:25
struct CopyStateData CopyStateData
Definition: nodes.h:508
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
Definition: execMain.c:285
#define strVal(v)
Definition: value.h:54
struct cursor * cur
Definition: ecpg.c:28
int raw_buf_index
Definition: copy.c:207
static void CopyAttributeOutText(CopyState cstate, char *string)
Definition: copy.c:4366
bool line_buf_valid
Definition: copy.c:196
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1603
CopyState cstate
Definition: copy.c:215
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_BINARY_W
Definition: c.h:1041
bool superuser(void)
Definition: superuser.c:47
int namestrcmp(Name name, const char *str)
Definition: name.c:248
#define MemSet(start, val, len)
Definition: c.h:853
bool fe_eof
Definition: copy.c:100
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:110
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:421
#define select(n, r, w, e, timeout)
Definition: win32.h:384
void heap_sync(Relation rel)
Definition: heapam.c:9122
Datum * tts_values
Definition: tuptable.h:125
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:28
static void ClosePipeToProgram(CopyState cstate)
Definition: copy.c:1708
#define HEAP_INSERT_SKIP_WAL
Definition: heapam.h:28
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
static int CopyReadAttributesCSV(CopyState cstate)
Definition: copy.c:4138
void PopActiveSnapshot(void)
Definition: snapmgr.c:807
AclMode requiredPerms
Definition: parsenodes.h:965
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, int instrument_options)
Definition: pquery.c:66
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:555
bool * force_quote_flags
Definition: copy.c:125
List * es_range_table
Definition: execnodes.h:372
Form_pg_class rd_rel
Definition: rel.h:113
unsigned int Oid
Definition: postgres_ext.h:31
char * delim
Definition: copy.c:120
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
Node * utilityStmt
Definition: parsenodes.h:111
bool is_program
Definition: parsenodes.h:1839
bool volatile_defexprs
Definition: copy.c:162
Datum oidout(PG_FUNCTION_ARGS)
Definition: oid.c:127
struct ErrorContextCallback * previous
Definition: elog.h:238
#define PG_BINARY_R
Definition: c.h:1040
static void copy_dest_destroy(DestReceiver *self)
Definition: copy.c:4719
bool * force_null_flags
Definition: copy.c:129
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
MemoryContext rowcontext
Definition: copy.c:149
int natts
Definition: tupdesc.h:73
bool line_buf_converted
Definition: copy.c:195
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:645
signed int int32
Definition: c.h:253
int ClosePipeStream(FILE *file)
Definition: fd.c:2419
int errdetail_internal(const char *fmt,...)
Definition: elog.c:900
bool * convert_select_flags
Definition: copy.c:132
static void CopySendChar(CopyState cstate, char c)
Definition: copy.c:455
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1926
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:149
int location
Definition: parsenodes.h:227
void * copyObject(const void *from)
Definition: copyfuncs.c:4475
CopyDest copy_dest
Definition: copy.c:96
int location
Definition: parsenodes.h:434
#define REFILL_LINEBUF
Definition: copy.c:265
#define HeapTupleSetOid(tuple, oid)
Definition: htup_details.h:698
ErrorContextCallback * error_context_stack
Definition: elog.c:88
#define list_make1(x1)
Definition: pg_list.h:133
char * null_print
Definition: copy.c:117
const char * cur_attname
Definition: copy.c:137
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:439
Definition: copy.c:61
List * pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, Oid *paramTypes, int numParams)
Definition: postgres.c:644
bool trig_insert_instead_row
Definition: reltrigger.h:57
void FreeExecutorState(EState *estate)
Definition: execUtils.c:168
Relation rel
Definition: copy.c:107
#define GetPerTupleExprContext(estate)
Definition: executor.h:338
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2306
MemoryContext copycontext
Definition: copy.c:143
uint32 AclMode
Definition: parsenodes.h:63
#define pq_startcopyout()
Definition: libpq.h:46
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:135
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execQual.c:4266
Bitmapset * selectedCols
Definition: parsenodes.h:967
unsigned short uint16
Definition: c.h:264
void pfree(void *pointer)
Definition: mcxt.c:992
void ExecSetupPartitionTupleRouting(Relation rel, PartitionDispatch **pd, ResultRelInfo **partitions, TupleConversionMap ***tup_conv_maps, TupleTableSlot **partition_tuple_slot, int *num_parted, int *num_partitions)
Definition: execMain.c:3099
#define IS_HIGHBIT_SET(ch)
Definition: c.h:969
#define linitial(l)
Definition: pg_list.h:110
static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query, Oid queryRelId, const char *filename, bool is_program, List *attnamelist, List *options)
Definition: copy.c:1754
static void CopySendInt16(CopyState cstate, int16 val)
Definition: copy.c:691
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1127
#define ExecEvalExpr(expr, econtext, isNull)
Definition: executor.h:73
TupleTableSlot * partition_tuple_slot
Definition: copy.c:170
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
void pq_startmsgread(void)
Definition: pqcomm.c:1157
#define DatumGetCString(X)
Definition: postgres.h:574
static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
Definition: copy.c:551
#define S_IWGRP
Definition: win32.h:478
#define lfirst_int(lc)
Definition: pg_list.h:107
static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr)
Definition: copy.c:4519
CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, List *attnamelist, List *options)
Definition: copy.c:2876
Datum ReceiveFunctionCall(FmgrInfo *flinfo, StringInfo buf, Oid typioparam, int32 typmod)
Definition: fmgr.c:1940
static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, BulkInsertState bistate, int nBufferedTuples, HeapTuple *bufferedTuples, int firstBufferedLineNo)
Definition: copy.c:2793
#define FATAL
Definition: elog.h:52
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:159
char * defGetString(DefElem *def)
Definition: define.c:49
RangeVar * relation
Definition: parsenodes.h:1833
ItemPointerData t_self
Definition: htup.h:65
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:764
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:831
TriggerDesc * trigdesc
Definition: rel.h:119
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
static char * limit_printout_length(const char *str)
Definition: copy.c:2260
QueryDesc * queryDesc
Definition: copy.c:108
EolType
Definition: copy.c:69
bool list_member_int(const List *list, int datum)
Definition: list.c:485
static void copy_dest_shutdown(DestReceiver *self)
Definition: copy.c:4710
int num_dispatch
Definition: copy.c:166
bool encoding_embeds_ascii
Definition: copy.c:104
uint32 t_len
Definition: htup.h:64
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
char * c
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2111
char * raw_buf
Definition: copy.c:206
static bool CopyLoadRawBuf(CopyState cstate)
Definition: copy.c:728
Node * stmt
Definition: parsenodes.h:1337
#define NoLock
Definition: lockdefs.h:34
int pq_getbytes(char *s, size_t len)
Definition: pqcomm.c:1041
char * quote
Definition: copy.c:121
static char * buf
Definition: pg_test_fsync.c:65
#define memmove(d, s, c)
Definition: c.h:1058
bool * tts_isnull
Definition: tuptable.h:126
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
int pg_database_encoding_max_length(void)
Definition: wchar.c:1833
List * targetList
Definition: parsenodes.h:1425
ResultRelInfo * es_result_relations
Definition: execnodes.h:382
static uint64 DoCopyTo(CopyState cstate)
Definition: copy.c:1883
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: copy.c:4682
int location
Definition: parsenodes.h:678
#define RowExclusiveLock
Definition: lockdefs.h:38
ExprState ** defexprs
Definition: copy.c:161
const char * cur_relname
Definition: copy.c:135
int errcode_for_file_access(void)
Definition: elog.c:598
int pg_encoding_mblen(int encoding, const char *mbstr)
Definition: wchar.c:1785
#define is_absolute_path(filename)
Definition: port.h:77
#define CStringGetDatum(X)
Definition: postgres.h:586
char string[11]
Definition: preproc-type.c:46
Definition: copy.c:73
List * options
Definition: parsenodes.h:1841
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2043
FmgrInfo oid_in_function
Definition: copy.c:156
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
#define RelationGetRelationName(relation)
Definition: rel.h:433
static const char BinarySignature[11]
Definition: copy.c:285
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:343
unsigned int uint32
Definition: c.h:265
bytea * SendFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1987
int raw_buf_len
Definition: copy.c:208
Oid t_tableOid
Definition: htup.h:66
#define RELKIND_FOREIGN_TABLE
Definition: pg_class.h:166
bool trig_insert_after_row
Definition: reltrigger.h:56
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
int max_fields
Definition: copy.c:184
char * escape
Definition: copy.c:122
const char * p_sourcetext
Definition: parse_node.h:167
List * returningList
Definition: parsenodes.h:135
TupleTableSlot * es_trig_tuple_slot
Definition: execnodes.h:388
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2133
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2633
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2567
#define ereport(elevel, rest)
Definition: elog.h:122
Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2383
int null_print_len
Definition: copy.c:118
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1239
List * force_null
Definition: copy.c:128
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:339
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:379
EState * CreateExecutorState(void)
Definition: execUtils.c:72
List * lappend_int(List *list, int datum)
Definition: list.c:146
Node * arg
Definition: parsenodes.h:676
Definition: copy.c:74
List * lappend(List *list, void *datum)
Definition: list.c:128
#define S_IWOTH
Definition: win32.h:482
int file_encoding
Definition: copy.c:102
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:4610
TupleDesc tupDesc
Definition: execdesc.h:46
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1219
#define InvalidSnapshot
Definition: snapshot.h:25
SubTransactionId rd_createSubid
Definition: rel.h:109
Oid * typioparams
Definition: copy.c:159
bool is_program
Definition: copy.c:111
#define RELKIND_PARTITIONED_TABLE
Definition: pg_class.h:168
Node * build_column_default(Relation rel, int attrno)
bool trig_insert_before_row
Definition: reltrigger.h:55
void getTypeBinaryOutputInfo(Oid type, Oid *typSend, bool *typIsVarlena)
Definition: lsyscache.c:2666
List * es_tupleTable
Definition: execnodes.h:399
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:156
void * palloc0(Size size)
Definition: mcxt.c:920
bool header_line
Definition: copy.c:116
uintptr_t Datum
Definition: postgres.h:374
int GetDatabaseEncoding(void)
Definition: mbutils.c:1015
int pg_get_client_encoding(void)
Definition: mbutils.c:317
#define ACL_SELECT
Definition: parsenodes.h:66
TupleTableSlot * tupslot
Definition: partition.h:66
void ExecSetSlotDescriptor(TupleTableSlot *slot, TupleDesc tupdesc)
Definition: execTuples.c:247
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2163
int ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:3188
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1781
int stmt_len
Definition: parsenodes.h:1339
#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen)
Definition: copy.c:236
int stmt_location
Definition: parsenodes.h:1338
int es_num_result_relations
Definition: execnodes.h:383
List * ri_PartitionCheck
Definition: execnodes.h:353
static uint64 CopyFrom(CopyState cstate)
Definition: copy.c:2289
#define RAW_BUF_SIZE
Definition: copy.c:205
TupleDesc rd_att
Definition: rel.h:114
bool freeze
Definition: copy.c:114
Datum InputFunctionCall(FmgrInfo *flinfo, char *str, Oid typioparam, int32 typmod)
Definition: fmgr.c:1882
int pq_getbyte(void)
Definition: pqcomm.c:947
void pq_endmsgread(void)
Definition: pqcomm.c:1181
Relation heap_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: heapam.c:1315
#define InvalidOid
Definition: postgres_ext.h:36
static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: copy.c:4691
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *orig_slot, EState *estate)
Definition: execMain.c:1789
bool XactReadOnly
Definition: xact.c:77
bool * force_notnull_flags
Definition: copy.c:127
CmdType commandType
Definition: parsenodes.h:103
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4150
List * force_notnull
Definition: copy.c:126
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:53
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void EndCopyFrom(CopyState cstate)
Definition: copy.c:3418
#define PG_CATCH()
Definition: elog.h:293
#define makeNode(_type_)
Definition: nodes.h:556
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
EolType eol_type
Definition: copy.c:101
#define NULL
Definition: c.h:226
QuerySource querySource
Definition: parsenodes.h:105
void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed)
Definition: copy.c:773
void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, CommandId cid, int options, BulkInsertState bistate)
Definition: heapam.c:2645
#define Assert(condition)
Definition: c.h:671
#define lfirst(lc)
Definition: pg_list.h:106
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2320
static void CopySendData(CopyState cstate, const void *databuf, int datasize)
Definition: copy.c:443
List * convert_select
Definition: copy.c:131
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:648
StringInfo fe_msgbuf
Definition: copy.c:98
CopyDest
Definition: copy.c:59
void pq_copymsgbytes(StringInfo msg, char *buf, int datalen)
Definition: pqformat.c:570
static bool CopyReadLine(CopyState cstate)
Definition: copy.c:3434
#define ACL_INSERT
Definition: parsenodes.h:65
TupleConstr * constr
Definition: tupdesc.h:76
#define pq_endcopyout(errorAbort)
Definition: libpq.h:47
size_t Size
Definition: c.h:353
static void CopySendString(CopyState cstate, const char *str)
Definition: copy.c:449
static int list_length(const List *l)
Definition: pg_list.h:89
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:109
static uint64 CopyTo(CopyState cstate)
Definition: copy.c:1937
#define PG_RE_THROW()
Definition: elog.h:314
#define InvalidSubTransactionId
Definition: c.h:400
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:668
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:725
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:2332
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1021
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, HeapTuple trigtuple, List *recheckIndexes)
Definition: trigger.c:2239
const char * name
Definition: encode.c:521
HeapTuple do_convert_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:341
void PreventCommandIfReadOnly(const char *cmdname)
Definition: utility.c:235
#define InvalidAttrNumber
Definition: attnum.h:23
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:343
#define DUMPSOFAR()
Definition: copy.c:4359
#define HEAP_INSERT_SKIP_FSM
Definition: heapam.h:29
static void CopySendInt32(CopyState cstate, int32 val)
Definition: copy.c:660
bool force_quote_all
Definition: copy.c:124
Node * query
Definition: parsenodes.h:1834
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
RTEKind rtekind
Definition: parsenodes.h:882
int FreeFile(FILE *file)
Definition: fd.c:2226
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define PG_ENCODING_IS_CLIENT_ONLY(_enc)
Definition: pg_wchar.h:296
DestReceiver * dest
Definition: execdesc.h:41
static void SendCopyBegin(CopyState cstate)
Definition: copy.c:345
static char * filename
Definition: pg_dumpall.c:80
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4170
void(* callback)(void *arg)
Definition: elog.h:239