PostgreSQL Source Code  git master
copy.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * copy.c
4  * Implements the COPY utility command
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/commands/copy.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 
21 #include "access/heapam.h"
22 #include "access/htup_details.h"
23 #include "access/sysattr.h"
24 #include "access/tableam.h"
25 #include "access/xact.h"
26 #include "access/xlog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/pg_authid.h"
29 #include "catalog/pg_type.h"
30 #include "commands/copy.h"
31 #include "commands/defrem.h"
32 #include "commands/trigger.h"
33 #include "executor/execPartition.h"
34 #include "executor/executor.h"
36 #include "executor/tuptable.h"
37 #include "foreign/fdwapi.h"
38 #include "libpq/libpq.h"
39 #include "libpq/pqformat.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "optimizer/optimizer.h"
43 #include "nodes/makefuncs.h"
44 #include "parser/parse_coerce.h"
45 #include "parser/parse_collate.h"
46 #include "parser/parse_expr.h"
47 #include "parser/parse_relation.h"
48 #include "port/pg_bswap.h"
49 #include "rewrite/rewriteHandler.h"
50 #include "storage/fd.h"
51 #include "tcop/tcopprot.h"
52 #include "utils/builtins.h"
53 #include "utils/lsyscache.h"
54 #include "utils/memutils.h"
55 #include "utils/partcache.h"
56 #include "utils/portal.h"
57 #include "utils/rel.h"
58 #include "utils/rls.h"
59 #include "utils/snapmgr.h"
60 
61 
62 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
63 #define OCTVALUE(c) ((c) - '0')
64 
65 /*
66  * Represents the different source/dest cases we need to worry about at
67  * the bottom level
68  */
69 typedef enum CopyDest
70 {
71  COPY_FILE, /* to/from file (or a piped program) */
72  COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
73  COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
74  COPY_CALLBACK /* to/from callback function */
75 } CopyDest;
76 
77 /*
78  * Represents the end-of-line terminator type of the input
79  */
80 typedef enum EolType
81 {
86 } EolType;
87 
88 /*
89  * Represents the heap insert method to be used during COPY FROM.
90  */
91 typedef enum CopyInsertMethod
92 {
93  CIM_SINGLE, /* use table_tuple_insert or fdw routine */
94  CIM_MULTI, /* always use table_multi_insert */
95  CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
97 
98 /*
99  * This struct contains all the state variables used throughout a COPY
100  * operation. For simplicity, we use the same struct for all variants of COPY,
101  * even though some fields are used in only some cases.
102  *
103  * Multi-byte encodings: all supported client-side encodings encode multi-byte
104  * characters by having the first byte's high bit set. Subsequent bytes of the
105  * character can have the high bit not set. When scanning data in such an
106  * encoding to look for a match to a single-byte (ie ASCII) character, we must
107  * use the full pg_encoding_mblen() machinery to skip over multibyte
108  * characters, else we might find a false match to a trailing byte. In
109  * supported server encodings, there is no possibility of a false match, and
110  * it's faster to make useless comparisons to trailing bytes than it is to
111  * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
112  * when we have to do it the hard way.
113  */
114 typedef struct CopyStateData
115 {
116  /* low-level state data */
117  CopyDest copy_dest; /* type of copy source/destination */
118  FILE *copy_file; /* used if copy_dest == COPY_FILE */
119  StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
120  * dest == COPY_NEW_FE in COPY FROM */
121  bool is_copy_from; /* COPY TO, or COPY FROM? */
122  bool reached_eof; /* true if we read to end of copy data (not
123  * all copy_dest types maintain this) */
124  EolType eol_type; /* EOL type of input */
125  int file_encoding; /* file or remote side's character encoding */
126  bool need_transcoding; /* file encoding diff from server? */
127  bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
128 
129  /* parameters from the COPY command */
130  Relation rel; /* relation to copy to or from */
131  QueryDesc *queryDesc; /* executable query to copy from */
132  List *attnumlist; /* integer list of attnums to copy */
133  char *filename; /* filename, or NULL for STDIN/STDOUT */
134  bool is_program; /* is 'filename' a program to popen? */
135  copy_data_source_cb data_source_cb; /* function for reading data */
136  bool binary; /* binary format? */
137  bool freeze; /* freeze rows on loading? */
138  bool csv_mode; /* Comma Separated Value format? */
139  bool header_line; /* CSV header line? */
140  char *null_print; /* NULL marker string (server encoding!) */
141  int null_print_len; /* length of same */
142  char *null_print_client; /* same converted to file encoding */
143  char *delim; /* column delimiter (must be 1 byte) */
144  char *quote; /* CSV quote char (must be 1 byte) */
145  char *escape; /* CSV escape char (must be 1 byte) */
146  List *force_quote; /* list of column names */
147  bool force_quote_all; /* FORCE_QUOTE *? */
148  bool *force_quote_flags; /* per-column CSV FQ flags */
149  List *force_notnull; /* list of column names */
150  bool *force_notnull_flags; /* per-column CSV FNN flags */
151  List *force_null; /* list of column names */
152  bool *force_null_flags; /* per-column CSV FN flags */
153  bool convert_selectively; /* do selective binary conversion? */
154  List *convert_select; /* list of column names (can be NIL) */
155  bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
156  Node *whereClause; /* WHERE condition (or NULL) */
157 
158  /* these are just for error messages, see CopyFromErrorCallback */
159  const char *cur_relname; /* table name for error messages */
160  uint64 cur_lineno; /* line number for error messages */
161  const char *cur_attname; /* current att for error messages */
162  const char *cur_attval; /* current att value for error messages */
163 
164  /*
165  * Working state for COPY TO/FROM
166  */
167  MemoryContext copycontext; /* per-copy execution context */
168 
169  /*
170  * Working state for COPY TO
171  */
172  FmgrInfo *out_functions; /* lookup info for output functions */
173  MemoryContext rowcontext; /* per-row evaluation context */
174 
175  /*
176  * Working state for COPY FROM
177  */
179  FmgrInfo *in_functions; /* array of input functions for each attrs */
180  Oid *typioparams; /* array of element types for in_functions */
181  int *defmap; /* array of default att numbers */
182  ExprState **defexprs; /* array of default att expressions */
183  bool volatile_defexprs; /* is any of defexprs volatile? */
186 
188 
189  /*
190  * These variables are used to reduce overhead in textual COPY FROM.
191  *
192  * attribute_buf holds the separated, de-escaped text for each field of
193  * the current line. The CopyReadAttributes functions return arrays of
194  * pointers into this buffer. We avoid palloc/pfree overhead by re-using
195  * the buffer on each cycle.
196  */
198 
199  /* field raw data pointers found by COPY FROM */
200 
202  char **raw_fields;
203 
204  /*
205  * Similarly, line_buf holds the whole input line being processed. The
206  * input cycle is first to read the whole line into line_buf, convert it
207  * to server encoding there, and then extract the individual attribute
208  * fields into attribute_buf. line_buf is preserved unmodified so that we
209  * can display it in error messages if appropriate.
210  */
212  bool line_buf_converted; /* converted to server encoding? */
213  bool line_buf_valid; /* contains the row being processed? */
214 
215  /*
216  * Finally, raw_buf holds raw data read from the data source (file or
217  * client connection). CopyReadLine parses this data sufficiently to
218  * locate line boundaries, then transfers the data to line_buf and
219  * converts it. Note: we guarantee that there is a \0 at
220  * raw_buf[raw_buf_len].
221  */
222 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
223  char *raw_buf;
224  int raw_buf_index; /* next byte to process */
225  int raw_buf_len; /* total # of bytes stored */
226 } CopyStateData;
227 
228 /* DestReceiver for COPY (query) TO */
229 typedef struct
230 {
231  DestReceiver pub; /* publicly-known function pointers */
232  CopyState cstate; /* CopyStateData for the command */
233  uint64 processed; /* # of tuples processed */
234 } DR_copy;
235 
236 
237 /*
238  * No more than this many tuples per CopyMultiInsertBuffer
239  *
240  * Caution: Don't make this too big, as we could end up with this many
241  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
242  * multiInsertBuffers list. Increasing this can cause quadratic growth in
243  * memory requirements during copies into partitioned tables with a large
244  * number of partitions.
245  */
246 #define MAX_BUFFERED_TUPLES 1000
247 
248 /*
249  * Flush buffers if there are >= this many bytes, as counted by the input
250  * size, of tuples stored.
251  */
252 #define MAX_BUFFERED_BYTES 65535
253 
254 /* Trim the list of buffers back down to this number after flushing */
255 #define MAX_PARTITION_BUFFERS 32
256 
257 /* Stores multi-insert data related to a single relation in CopyFrom. */
258 typedef struct CopyMultiInsertBuffer
259 {
260  TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
261  ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
262  BulkInsertState bistate; /* BulkInsertState for this rel */
263  int nused; /* number of 'slots' containing tuples */
264  uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
265  * stream */
267 
268 /*
269  * Stores one or many CopyMultiInsertBuffers and details about the size and
270  * number of tuples which are stored in them. This allows multiple buffers to
271  * exist at once when COPYing into a partitioned table.
272  */
273 typedef struct CopyMultiInsertInfo
274 {
275  List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
276  int bufferedTuples; /* number of tuples buffered over all buffers */
277  int bufferedBytes; /* number of bytes from all buffered tuples */
278  CopyState cstate; /* Copy state for this CopyMultiInsertInfo */
279  EState *estate; /* Executor state used for COPY */
280  CommandId mycid; /* Command Id used for COPY */
281  int ti_options; /* table insert options */
283 
284 
285 /*
286  * These macros centralize code used to process line_buf and raw_buf buffers.
287  * They are macros because they often do continue/break control and to avoid
288  * function call overhead in tight COPY loops.
289  *
290  * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
291  * prevent the continue/break processing from working. We end the "if (1)"
292  * with "else ((void) 0)" to ensure the "if" does not unintentionally match
293  * any "else" in the calling code, and to avoid any compiler warnings about
294  * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
295  */
296 
297 /*
298  * This keeps the character read at the top of the loop in the buffer
299  * even if there is more than one read-ahead.
300  */
301 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
302 if (1) \
303 { \
304  if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
305  { \
306  raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
307  need_data = true; \
308  continue; \
309  } \
310 } else ((void) 0)
311 
312 /* This consumes the remainder of the buffer and breaks */
313 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
314 if (1) \
315 { \
316  if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
317  { \
318  if (extralen) \
319  raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
320  /* backslash just before EOF, treat as data char */ \
321  result = true; \
322  break; \
323  } \
324 } else ((void) 0)
325 
326 /*
327  * Transfer any approved data to line_buf; must do this to be sure
328  * there is some room in raw_buf.
329  */
330 #define REFILL_LINEBUF \
331 if (1) \
332 { \
333  if (raw_buf_ptr > cstate->raw_buf_index) \
334  { \
335  appendBinaryStringInfo(&cstate->line_buf, \
336  cstate->raw_buf + cstate->raw_buf_index, \
337  raw_buf_ptr - cstate->raw_buf_index); \
338  cstate->raw_buf_index = raw_buf_ptr; \
339  } \
340 } else ((void) 0)
341 
342 /* Undo any read-ahead and jump out of the block. */
343 #define NO_END_OF_COPY_GOTO \
344 if (1) \
345 { \
346  raw_buf_ptr = prev_raw_ptr + 1; \
347  goto not_end_of_copy; \
348 } else ((void) 0)
349 
350 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
351 
352 
353 /* non-export function prototypes */
354 static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
355  RawStmt *raw_query, Oid queryRelId, List *attnamelist,
356  List *options);
357 static void EndCopy(CopyState cstate);
358 static void ClosePipeToProgram(CopyState cstate);
359 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
360  Oid queryRelId, const char *filename, bool is_program,
361  List *attnamelist, List *options);
362 static void EndCopyTo(CopyState cstate);
363 static uint64 DoCopyTo(CopyState cstate);
364 static uint64 CopyTo(CopyState cstate);
365 static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
366 static bool CopyReadLine(CopyState cstate);
367 static bool CopyReadLineText(CopyState cstate);
368 static int CopyReadAttributesText(CopyState cstate);
369 static int CopyReadAttributesCSV(CopyState cstate);
371  int column_no, FmgrInfo *flinfo,
372  Oid typioparam, int32 typmod,
373  bool *isnull);
374 static void CopyAttributeOutText(CopyState cstate, char *string);
375 static void CopyAttributeOutCSV(CopyState cstate, char *string,
376  bool use_quote, bool single_attr);
377 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
378  List *attnamelist);
379 static char *limit_printout_length(const char *str);
380 
381 /* Low-level communications functions */
382 static void SendCopyBegin(CopyState cstate);
383 static void ReceiveCopyBegin(CopyState cstate);
384 static void SendCopyEnd(CopyState cstate);
385 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
386 static void CopySendString(CopyState cstate, const char *str);
387 static void CopySendChar(CopyState cstate, char c);
388 static void CopySendEndOfRow(CopyState cstate);
389 static int CopyGetData(CopyState cstate, void *databuf,
390  int minread, int maxread);
391 static void CopySendInt32(CopyState cstate, int32 val);
392 static bool CopyGetInt32(CopyState cstate, int32 *val);
393 static void CopySendInt16(CopyState cstate, int16 val);
394 static bool CopyGetInt16(CopyState cstate, int16 *val);
395 
396 
397 /*
398  * Send copy start/stop messages for frontend copies. These have changed
399  * in past protocol redesigns.
400  */
401 static void
403 {
405  {
406  /* new way */
408  int natts = list_length(cstate->attnumlist);
409  int16 format = (cstate->binary ? 1 : 0);
410  int i;
411 
412  pq_beginmessage(&buf, 'H');
413  pq_sendbyte(&buf, format); /* overall format */
414  pq_sendint16(&buf, natts);
415  for (i = 0; i < natts; i++)
416  pq_sendint16(&buf, format); /* per-column formats */
417  pq_endmessage(&buf);
418  cstate->copy_dest = COPY_NEW_FE;
419  }
420  else
421  {
422  /* old way */
423  if (cstate->binary)
424  ereport(ERROR,
425  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
426  errmsg("COPY BINARY is not supported to stdout or from stdin")));
427  pq_putemptymessage('H');
428  /* grottiness needed for old COPY OUT protocol */
429  pq_startcopyout();
430  cstate->copy_dest = COPY_OLD_FE;
431  }
432 }
433 
434 static void
436 {
438  {
439  /* new way */
441  int natts = list_length(cstate->attnumlist);
442  int16 format = (cstate->binary ? 1 : 0);
443  int i;
444 
445  pq_beginmessage(&buf, 'G');
446  pq_sendbyte(&buf, format); /* overall format */
447  pq_sendint16(&buf, natts);
448  for (i = 0; i < natts; i++)
449  pq_sendint16(&buf, format); /* per-column formats */
450  pq_endmessage(&buf);
451  cstate->copy_dest = COPY_NEW_FE;
452  cstate->fe_msgbuf = makeStringInfo();
453  }
454  else
455  {
456  /* old way */
457  if (cstate->binary)
458  ereport(ERROR,
459  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
460  errmsg("COPY BINARY is not supported to stdout or from stdin")));
461  pq_putemptymessage('G');
462  /* any error in old protocol will make us lose sync */
463  pq_startmsgread();
464  cstate->copy_dest = COPY_OLD_FE;
465  }
466  /* We *must* flush here to ensure FE knows it can send. */
467  pq_flush();
468 }
469 
470 static void
472 {
473  if (cstate->copy_dest == COPY_NEW_FE)
474  {
475  /* Shouldn't have any unsent data */
476  Assert(cstate->fe_msgbuf->len == 0);
477  /* Send Copy Done message */
478  pq_putemptymessage('c');
479  }
480  else
481  {
482  CopySendData(cstate, "\\.", 2);
483  /* Need to flush out the trailer (this also appends a newline) */
484  CopySendEndOfRow(cstate);
485  pq_endcopyout(false);
486  }
487 }
488 
489 /*----------
490  * CopySendData sends output data to the destination (file or frontend)
491  * CopySendString does the same for null-terminated strings
492  * CopySendChar does the same for single characters
493  * CopySendEndOfRow does the appropriate thing at end of each data row
494  * (data is not actually flushed except by CopySendEndOfRow)
495  *
496  * NB: no data conversion is applied by these functions
497  *----------
498  */
499 static void
500 CopySendData(CopyState cstate, const void *databuf, int datasize)
501 {
502  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
503 }
504 
505 static void
506 CopySendString(CopyState cstate, const char *str)
507 {
508  appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
509 }
510 
511 static void
512 CopySendChar(CopyState cstate, char c)
513 {
515 }
516 
517 static void
519 {
520  StringInfo fe_msgbuf = cstate->fe_msgbuf;
521 
522  switch (cstate->copy_dest)
523  {
524  case COPY_FILE:
525  if (!cstate->binary)
526  {
527  /* Default line termination depends on platform */
528 #ifndef WIN32
529  CopySendChar(cstate, '\n');
530 #else
531  CopySendString(cstate, "\r\n");
532 #endif
533  }
534 
535  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
536  cstate->copy_file) != 1 ||
537  ferror(cstate->copy_file))
538  {
539  if (cstate->is_program)
540  {
541  if (errno == EPIPE)
542  {
543  /*
544  * The pipe will be closed automatically on error at
545  * the end of transaction, but we might get a better
546  * error message from the subprocess' exit code than
547  * just "Broken Pipe"
548  */
549  ClosePipeToProgram(cstate);
550 
551  /*
552  * If ClosePipeToProgram() didn't throw an error, the
553  * program terminated normally, but closed the pipe
554  * first. Restore errno, and throw an error.
555  */
556  errno = EPIPE;
557  }
558  ereport(ERROR,
560  errmsg("could not write to COPY program: %m")));
561  }
562  else
563  ereport(ERROR,
565  errmsg("could not write to COPY file: %m")));
566  }
567  break;
568  case COPY_OLD_FE:
569  /* The FE/BE protocol uses \n as newline for all platforms */
570  if (!cstate->binary)
571  CopySendChar(cstate, '\n');
572 
573  if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
574  {
575  /* no hope of recovering connection sync, so FATAL */
576  ereport(FATAL,
577  (errcode(ERRCODE_CONNECTION_FAILURE),
578  errmsg("connection lost during COPY to stdout")));
579  }
580  break;
581  case COPY_NEW_FE:
582  /* The FE/BE protocol uses \n as newline for all platforms */
583  if (!cstate->binary)
584  CopySendChar(cstate, '\n');
585 
586  /* Dump the accumulated row as one CopyData message */
587  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
588  break;
589  case COPY_CALLBACK:
590  Assert(false); /* Not yet supported. */
591  break;
592  }
593 
594  resetStringInfo(fe_msgbuf);
595 }
596 
597 /*
598  * CopyGetData reads data from the source (file or frontend)
599  *
600  * We attempt to read at least minread, and at most maxread, bytes from
601  * the source. The actual number of bytes read is returned; if this is
602  * less than minread, EOF was detected.
603  *
604  * Note: when copying from the frontend, we expect a proper EOF mark per
605  * protocol; if the frontend simply drops the connection, we raise error.
606  * It seems unwise to allow the COPY IN to complete normally in that case.
607  *
608  * NB: no data conversion is applied here.
609  */
610 static int
611 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
612 {
613  int bytesread = 0;
614 
615  switch (cstate->copy_dest)
616  {
617  case COPY_FILE:
618  bytesread = fread(databuf, 1, maxread, cstate->copy_file);
619  if (ferror(cstate->copy_file))
620  ereport(ERROR,
622  errmsg("could not read from COPY file: %m")));
623  if (bytesread == 0)
624  cstate->reached_eof = true;
625  break;
626  case COPY_OLD_FE:
627 
628  /*
629  * We cannot read more than minread bytes (which in practice is 1)
630  * because old protocol doesn't have any clear way of separating
631  * the COPY stream from following data. This is slow, but not any
632  * slower than the code path was originally, and we don't care
633  * much anymore about the performance of old protocol.
634  */
635  if (pq_getbytes((char *) databuf, minread))
636  {
637  /* Only a \. terminator is legal EOF in old protocol */
638  ereport(ERROR,
639  (errcode(ERRCODE_CONNECTION_FAILURE),
640  errmsg("unexpected EOF on client connection with an open transaction")));
641  }
642  bytesread = minread;
643  break;
644  case COPY_NEW_FE:
645  while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
646  {
647  int avail;
648 
649  while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
650  {
651  /* Try to receive another message */
652  int mtype;
653 
654  readmessage:
656  pq_startmsgread();
657  mtype = pq_getbyte();
658  if (mtype == EOF)
659  ereport(ERROR,
660  (errcode(ERRCODE_CONNECTION_FAILURE),
661  errmsg("unexpected EOF on client connection with an open transaction")));
662  if (pq_getmessage(cstate->fe_msgbuf, 0))
663  ereport(ERROR,
664  (errcode(ERRCODE_CONNECTION_FAILURE),
665  errmsg("unexpected EOF on client connection with an open transaction")));
667  switch (mtype)
668  {
669  case 'd': /* CopyData */
670  break;
671  case 'c': /* CopyDone */
672  /* COPY IN correctly terminated by frontend */
673  cstate->reached_eof = true;
674  return bytesread;
675  case 'f': /* CopyFail */
676  ereport(ERROR,
677  (errcode(ERRCODE_QUERY_CANCELED),
678  errmsg("COPY from stdin failed: %s",
679  pq_getmsgstring(cstate->fe_msgbuf))));
680  break;
681  case 'H': /* Flush */
682  case 'S': /* Sync */
683 
684  /*
685  * Ignore Flush/Sync for the convenience of client
686  * libraries (such as libpq) that may send those
687  * without noticing that the command they just
688  * sent was COPY.
689  */
690  goto readmessage;
691  default:
692  ereport(ERROR,
693  (errcode(ERRCODE_PROTOCOL_VIOLATION),
694  errmsg("unexpected message type 0x%02X during COPY from stdin",
695  mtype)));
696  break;
697  }
698  }
699  avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
700  if (avail > maxread)
701  avail = maxread;
702  pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
703  databuf = (void *) ((char *) databuf + avail);
704  maxread -= avail;
705  bytesread += avail;
706  }
707  break;
708  case COPY_CALLBACK:
709  bytesread = cstate->data_source_cb(databuf, minread, maxread);
710  break;
711  }
712 
713  return bytesread;
714 }
715 
716 
717 /*
718  * These functions do apply some data conversion
719  */
720 
721 /*
722  * CopySendInt32 sends an int32 in network byte order
723  */
724 static void
726 {
727  uint32 buf;
728 
729  buf = pg_hton32((uint32) val);
730  CopySendData(cstate, &buf, sizeof(buf));
731 }
732 
733 /*
734  * CopyGetInt32 reads an int32 that appears in network byte order
735  *
736  * Returns true if OK, false if EOF
737  */
738 static bool
740 {
741  uint32 buf;
742 
743  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
744  {
745  *val = 0; /* suppress compiler warning */
746  return false;
747  }
748  *val = (int32) pg_ntoh32(buf);
749  return true;
750 }
751 
752 /*
753  * CopySendInt16 sends an int16 in network byte order
754  */
755 static void
757 {
758  uint16 buf;
759 
760  buf = pg_hton16((uint16) val);
761  CopySendData(cstate, &buf, sizeof(buf));
762 }
763 
764 /*
765  * CopyGetInt16 reads an int16 that appears in network byte order
766  */
767 static bool
769 {
770  uint16 buf;
771 
772  if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
773  {
774  *val = 0; /* suppress compiler warning */
775  return false;
776  }
777  *val = (int16) pg_ntoh16(buf);
778  return true;
779 }
780 
781 
782 /*
783  * CopyLoadRawBuf loads some more data into raw_buf
784  *
785  * Returns true if able to obtain at least one more byte, else false.
786  *
787  * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
788  * down to the start of the buffer and then we load more data after that.
789  * This case is used only when a frontend multibyte character crosses a
790  * bufferload boundary.
791  */
792 static bool
794 {
795  int nbytes;
796  int inbytes;
797 
798  if (cstate->raw_buf_index < cstate->raw_buf_len)
799  {
800  /* Copy down the unprocessed data */
801  nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
802  memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
803  nbytes);
804  }
805  else
806  nbytes = 0; /* no data need be saved */
807 
808  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
809  1, RAW_BUF_SIZE - nbytes);
810  nbytes += inbytes;
811  cstate->raw_buf[nbytes] = '\0';
812  cstate->raw_buf_index = 0;
813  cstate->raw_buf_len = nbytes;
814  return (inbytes > 0);
815 }
816 
817 
818 /*
819  * DoCopy executes the SQL COPY statement
820  *
821  * Either unload or reload contents of table <relation>, depending on <from>.
822  * (<from> = true means we are inserting into the table.) In the "TO" case
823  * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
824  * or DELETE query.
825  *
826  * If <pipe> is false, transfer is between the table and the file named
827  * <filename>. Otherwise, transfer is between the table and our regular
828  * input/output stream. The latter could be either stdin/stdout or a
829  * socket, depending on whether we're running under Postmaster control.
830  *
831  * Do not allow a Postgres user without the 'pg_read_server_files' or
832  * 'pg_write_server_files' role to read from or write to a file.
833  *
834  * Do not allow the copy if user doesn't have proper permission to access
835  * the table or the specifically requested columns.
836  */
837 void
838 DoCopy(ParseState *pstate, const CopyStmt *stmt,
839  int stmt_location, int stmt_len,
840  uint64 *processed)
841 {
842  CopyState cstate;
843  bool is_from = stmt->is_from;
844  bool pipe = (stmt->filename == NULL);
845  Relation rel;
846  Oid relid;
847  RawStmt *query = NULL;
848  Node *whereClause = NULL;
849 
850  /*
851  * Disallow COPY to/from file or program except to users with the
852  * appropriate role.
853  */
854  if (!pipe)
855  {
856  if (stmt->is_program)
857  {
858  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_EXECUTE_SERVER_PROGRAM))
859  ereport(ERROR,
860  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
861  errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
862  errhint("Anyone can COPY to stdout or from stdin. "
863  "psql's \\copy command also works for anyone.")));
864  }
865  else
866  {
867  if (is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_SERVER_FILES))
868  ereport(ERROR,
869  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
870  errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
871  errhint("Anyone can COPY to stdout or from stdin. "
872  "psql's \\copy command also works for anyone.")));
873 
874  if (!is_from && !is_member_of_role(GetUserId(), DEFAULT_ROLE_WRITE_SERVER_FILES))
875  ereport(ERROR,
876  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
877  errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
878  errhint("Anyone can COPY to stdout or from stdin. "
879  "psql's \\copy command also works for anyone.")));
880  }
881  }
882 
883  if (stmt->relation)
884  {
885  LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
886  RangeTblEntry *rte;
887  TupleDesc tupDesc;
888  List *attnums;
889  ListCell *cur;
890 
891  Assert(!stmt->query);
892 
893  /* Open and lock the relation, using the appropriate lock type. */
894  rel = table_openrv(stmt->relation, lockmode);
895 
896  relid = RelationGetRelid(rel);
897 
898  rte = addRangeTableEntryForRelation(pstate, rel, lockmode,
899  NULL, false, false);
900  rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
901 
902  if (stmt->whereClause)
903  {
904  /* add rte to column namespace */
905  addRTEtoQuery(pstate, rte, false, true, true);
906 
907  /* Transform the raw expression tree */
908  whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
909 
910  /* Make sure it yields a boolean result. */
911  whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
912 
913  /* we have to fix its collations too */
914  assign_expr_collations(pstate, whereClause);
915 
916  whereClause = eval_const_expressions(NULL, whereClause);
917 
918  whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
919  whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
920  }
921 
922  tupDesc = RelationGetDescr(rel);
923  attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
924  foreach(cur, attnums)
925  {
926  int attno = lfirst_int(cur) -
928 
929  if (is_from)
930  rte->insertedCols = bms_add_member(rte->insertedCols, attno);
931  else
932  rte->selectedCols = bms_add_member(rte->selectedCols, attno);
933  }
934  ExecCheckRTPerms(pstate->p_rtable, true);
935 
936  /*
937  * Permission check for row security policies.
938  *
939  * check_enable_rls will ereport(ERROR) if the user has requested
940  * something invalid and will otherwise indicate if we should enable
941  * RLS (returns RLS_ENABLED) or not for this COPY statement.
942  *
943  * If the relation has a row security policy and we are to apply it
944  * then perform a "query" copy and allow the normal query processing
945  * to handle the policies.
946  *
947  * If RLS is not enabled for this, then just fall through to the
948  * normal non-filtering relation handling.
949  */
950  if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
951  {
953  ColumnRef *cr;
954  ResTarget *target;
955  RangeVar *from;
956  List *targetList = NIL;
957 
958  if (is_from)
959  ereport(ERROR,
960  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
961  errmsg("COPY FROM not supported with row-level security"),
962  errhint("Use INSERT statements instead.")));
963 
964  /*
965  * Build target list
966  *
967  * If no columns are specified in the attribute list of the COPY
968  * command, then the target list is 'all' columns. Therefore, '*'
969  * should be used as the target list for the resulting SELECT
970  * statement.
971  *
972  * In the case that columns are specified in the attribute list,
973  * create a ColumnRef and ResTarget for each column and add them
974  * to the target list for the resulting SELECT statement.
975  */
976  if (!stmt->attlist)
977  {
978  cr = makeNode(ColumnRef);
980  cr->location = -1;
981 
982  target = makeNode(ResTarget);
983  target->name = NULL;
984  target->indirection = NIL;
985  target->val = (Node *) cr;
986  target->location = -1;
987 
988  targetList = list_make1(target);
989  }
990  else
991  {
992  ListCell *lc;
993 
994  foreach(lc, stmt->attlist)
995  {
996  /*
997  * Build the ColumnRef for each column. The ColumnRef
998  * 'fields' property is a String 'Value' node (see
999  * nodes/value.h) that corresponds to the column name
1000  * respectively.
1001  */
1002  cr = makeNode(ColumnRef);
1003  cr->fields = list_make1(lfirst(lc));
1004  cr->location = -1;
1005 
1006  /* Build the ResTarget and add the ColumnRef to it. */
1007  target = makeNode(ResTarget);
1008  target->name = NULL;
1009  target->indirection = NIL;
1010  target->val = (Node *) cr;
1011  target->location = -1;
1012 
1013  /* Add each column to the SELECT statement's target list */
1014  targetList = lappend(targetList, target);
1015  }
1016  }
1017 
1018  /*
1019  * Build RangeVar for from clause, fully qualified based on the
1020  * relation which we have opened and locked.
1021  */
1024  -1);
1025 
1026  /* Build query */
1027  select = makeNode(SelectStmt);
1028  select->targetList = targetList;
1029  select->fromClause = list_make1(from);
1030 
1031  query = makeNode(RawStmt);
1032  query->stmt = (Node *) select;
1033  query->stmt_location = stmt_location;
1034  query->stmt_len = stmt_len;
1035 
1036  /*
1037  * Close the relation for now, but keep the lock on it to prevent
1038  * changes between now and when we start the query-based COPY.
1039  *
1040  * We'll reopen it later as part of the query-based COPY.
1041  */
1042  table_close(rel, NoLock);
1043  rel = NULL;
1044  }
1045  }
1046  else
1047  {
1048  Assert(stmt->query);
1049 
1050  query = makeNode(RawStmt);
1051  query->stmt = stmt->query;
1052  query->stmt_location = stmt_location;
1053  query->stmt_len = stmt_len;
1054 
1055  relid = InvalidOid;
1056  rel = NULL;
1057  }
1058 
1059  if (is_from)
1060  {
1061  Assert(rel);
1062 
1063  /* check read-only transaction and parallel mode */
1064  if (XactReadOnly && !rel->rd_islocaltemp)
1065  PreventCommandIfReadOnly("COPY FROM");
1066  PreventCommandIfParallelMode("COPY FROM");
1067 
1068  cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
1069  NULL, stmt->attlist, stmt->options);
1070  cstate->whereClause = whereClause;
1071  *processed = CopyFrom(cstate); /* copy from file to database */
1072  EndCopyFrom(cstate);
1073  }
1074  else
1075  {
1076  cstate = BeginCopyTo(pstate, rel, query, relid,
1077  stmt->filename, stmt->is_program,
1078  stmt->attlist, stmt->options);
1079  *processed = DoCopyTo(cstate); /* copy from database to file */
1080  EndCopyTo(cstate);
1081  }
1082 
1083  /*
1084  * Close the relation. If reading, we can release the AccessShareLock we
1085  * got; if writing, we should hold the lock until end of transaction to
1086  * ensure that updates will be committed before lock is released.
1087  */
1088  if (rel != NULL)
1089  table_close(rel, (is_from ? NoLock : AccessShareLock));
1090 }
1091 
1092 /*
1093  * Process the statement option list for COPY.
1094  *
1095  * Scan the options list (a list of DefElem) and transpose the information
1096  * into cstate, applying appropriate error checking.
1097  *
1098  * cstate is assumed to be filled with zeroes initially.
1099  *
1100  * This is exported so that external users of the COPY API can sanity-check
1101  * a list of options. In that usage, cstate should be passed as NULL
1102  * (since external users don't know sizeof(CopyStateData)) and the collected
1103  * data is just leaked until CurrentMemoryContext is reset.
1104  *
1105  * Note that additional checking, such as whether column names listed in FORCE
1106  * QUOTE actually exist, has to be applied later. This just checks for
1107  * self-consistency of the options list.
1108  */
1109 void
1111  CopyState cstate,
1112  bool is_from,
1113  List *options)
1114 {
1115  bool format_specified = false;
1116  ListCell *option;
1117 
1118  /* Support external use for option sanity checking */
1119  if (cstate == NULL)
1120  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1121 
1122  cstate->is_copy_from = is_from;
1123 
1124  cstate->file_encoding = -1;
1125 
1126  /* Extract options from the statement node tree */
1127  foreach(option, options)
1128  {
1129  DefElem *defel = lfirst_node(DefElem, option);
1130 
1131  if (strcmp(defel->defname, "format") == 0)
1132  {
1133  char *fmt = defGetString(defel);
1134 
1135  if (format_specified)
1136  ereport(ERROR,
1137  (errcode(ERRCODE_SYNTAX_ERROR),
1138  errmsg("conflicting or redundant options"),
1139  parser_errposition(pstate, defel->location)));
1140  format_specified = true;
1141  if (strcmp(fmt, "text") == 0)
1142  /* default format */ ;
1143  else if (strcmp(fmt, "csv") == 0)
1144  cstate->csv_mode = true;
1145  else if (strcmp(fmt, "binary") == 0)
1146  cstate->binary = true;
1147  else
1148  ereport(ERROR,
1149  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1150  errmsg("COPY format \"%s\" not recognized", fmt),
1151  parser_errposition(pstate, defel->location)));
1152  }
1153  else if (strcmp(defel->defname, "freeze") == 0)
1154  {
1155  if (cstate->freeze)
1156  ereport(ERROR,
1157  (errcode(ERRCODE_SYNTAX_ERROR),
1158  errmsg("conflicting or redundant options"),
1159  parser_errposition(pstate, defel->location)));
1160  cstate->freeze = defGetBoolean(defel);
1161  }
1162  else if (strcmp(defel->defname, "delimiter") == 0)
1163  {
1164  if (cstate->delim)
1165  ereport(ERROR,
1166  (errcode(ERRCODE_SYNTAX_ERROR),
1167  errmsg("conflicting or redundant options"),
1168  parser_errposition(pstate, defel->location)));
1169  cstate->delim = defGetString(defel);
1170  }
1171  else if (strcmp(defel->defname, "null") == 0)
1172  {
1173  if (cstate->null_print)
1174  ereport(ERROR,
1175  (errcode(ERRCODE_SYNTAX_ERROR),
1176  errmsg("conflicting or redundant options"),
1177  parser_errposition(pstate, defel->location)));
1178  cstate->null_print = defGetString(defel);
1179  }
1180  else if (strcmp(defel->defname, "header") == 0)
1181  {
1182  if (cstate->header_line)
1183  ereport(ERROR,
1184  (errcode(ERRCODE_SYNTAX_ERROR),
1185  errmsg("conflicting or redundant options"),
1186  parser_errposition(pstate, defel->location)));
1187  cstate->header_line = defGetBoolean(defel);
1188  }
1189  else if (strcmp(defel->defname, "quote") == 0)
1190  {
1191  if (cstate->quote)
1192  ereport(ERROR,
1193  (errcode(ERRCODE_SYNTAX_ERROR),
1194  errmsg("conflicting or redundant options"),
1195  parser_errposition(pstate, defel->location)));
1196  cstate->quote = defGetString(defel);
1197  }
1198  else if (strcmp(defel->defname, "escape") == 0)
1199  {
1200  if (cstate->escape)
1201  ereport(ERROR,
1202  (errcode(ERRCODE_SYNTAX_ERROR),
1203  errmsg("conflicting or redundant options"),
1204  parser_errposition(pstate, defel->location)));
1205  cstate->escape = defGetString(defel);
1206  }
1207  else if (strcmp(defel->defname, "force_quote") == 0)
1208  {
1209  if (cstate->force_quote || cstate->force_quote_all)
1210  ereport(ERROR,
1211  (errcode(ERRCODE_SYNTAX_ERROR),
1212  errmsg("conflicting or redundant options"),
1213  parser_errposition(pstate, defel->location)));
1214  if (defel->arg && IsA(defel->arg, A_Star))
1215  cstate->force_quote_all = true;
1216  else if (defel->arg && IsA(defel->arg, List))
1217  cstate->force_quote = castNode(List, defel->arg);
1218  else
1219  ereport(ERROR,
1220  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1221  errmsg("argument to option \"%s\" must be a list of column names",
1222  defel->defname),
1223  parser_errposition(pstate, defel->location)));
1224  }
1225  else if (strcmp(defel->defname, "force_not_null") == 0)
1226  {
1227  if (cstate->force_notnull)
1228  ereport(ERROR,
1229  (errcode(ERRCODE_SYNTAX_ERROR),
1230  errmsg("conflicting or redundant options"),
1231  parser_errposition(pstate, defel->location)));
1232  if (defel->arg && IsA(defel->arg, List))
1233  cstate->force_notnull = castNode(List, defel->arg);
1234  else
1235  ereport(ERROR,
1236  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1237  errmsg("argument to option \"%s\" must be a list of column names",
1238  defel->defname),
1239  parser_errposition(pstate, defel->location)));
1240  }
1241  else if (strcmp(defel->defname, "force_null") == 0)
1242  {
1243  if (cstate->force_null)
1244  ereport(ERROR,
1245  (errcode(ERRCODE_SYNTAX_ERROR),
1246  errmsg("conflicting or redundant options")));
1247  if (defel->arg && IsA(defel->arg, List))
1248  cstate->force_null = castNode(List, defel->arg);
1249  else
1250  ereport(ERROR,
1251  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1252  errmsg("argument to option \"%s\" must be a list of column names",
1253  defel->defname),
1254  parser_errposition(pstate, defel->location)));
1255  }
1256  else if (strcmp(defel->defname, "convert_selectively") == 0)
1257  {
1258  /*
1259  * Undocumented, not-accessible-from-SQL option: convert only the
1260  * named columns to binary form, storing the rest as NULLs. It's
1261  * allowed for the column list to be NIL.
1262  */
1263  if (cstate->convert_selectively)
1264  ereport(ERROR,
1265  (errcode(ERRCODE_SYNTAX_ERROR),
1266  errmsg("conflicting or redundant options"),
1267  parser_errposition(pstate, defel->location)));
1268  cstate->convert_selectively = true;
1269  if (defel->arg == NULL || IsA(defel->arg, List))
1270  cstate->convert_select = castNode(List, defel->arg);
1271  else
1272  ereport(ERROR,
1273  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1274  errmsg("argument to option \"%s\" must be a list of column names",
1275  defel->defname),
1276  parser_errposition(pstate, defel->location)));
1277  }
1278  else if (strcmp(defel->defname, "encoding") == 0)
1279  {
1280  if (cstate->file_encoding >= 0)
1281  ereport(ERROR,
1282  (errcode(ERRCODE_SYNTAX_ERROR),
1283  errmsg("conflicting or redundant options"),
1284  parser_errposition(pstate, defel->location)));
1286  if (cstate->file_encoding < 0)
1287  ereport(ERROR,
1288  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1289  errmsg("argument to option \"%s\" must be a valid encoding name",
1290  defel->defname),
1291  parser_errposition(pstate, defel->location)));
1292  }
1293  else
1294  ereport(ERROR,
1295  (errcode(ERRCODE_SYNTAX_ERROR),
1296  errmsg("option \"%s\" not recognized",
1297  defel->defname),
1298  parser_errposition(pstate, defel->location)));
1299  }
1300 
1301  /*
1302  * Check for incompatible options (must do these two before inserting
1303  * defaults)
1304  */
1305  if (cstate->binary && cstate->delim)
1306  ereport(ERROR,
1307  (errcode(ERRCODE_SYNTAX_ERROR),
1308  errmsg("cannot specify DELIMITER in BINARY mode")));
1309 
1310  if (cstate->binary && cstate->null_print)
1311  ereport(ERROR,
1312  (errcode(ERRCODE_SYNTAX_ERROR),
1313  errmsg("cannot specify NULL in BINARY mode")));
1314 
1315  /* Set defaults for omitted options */
1316  if (!cstate->delim)
1317  cstate->delim = cstate->csv_mode ? "," : "\t";
1318 
1319  if (!cstate->null_print)
1320  cstate->null_print = cstate->csv_mode ? "" : "\\N";
1321  cstate->null_print_len = strlen(cstate->null_print);
1322 
1323  if (cstate->csv_mode)
1324  {
1325  if (!cstate->quote)
1326  cstate->quote = "\"";
1327  if (!cstate->escape)
1328  cstate->escape = cstate->quote;
1329  }
1330 
1331  /* Only single-byte delimiter strings are supported. */
1332  if (strlen(cstate->delim) != 1)
1333  ereport(ERROR,
1334  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1335  errmsg("COPY delimiter must be a single one-byte character")));
1336 
1337  /* Disallow end-of-line characters */
1338  if (strchr(cstate->delim, '\r') != NULL ||
1339  strchr(cstate->delim, '\n') != NULL)
1340  ereport(ERROR,
1341  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1342  errmsg("COPY delimiter cannot be newline or carriage return")));
1343 
1344  if (strchr(cstate->null_print, '\r') != NULL ||
1345  strchr(cstate->null_print, '\n') != NULL)
1346  ereport(ERROR,
1347  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1348  errmsg("COPY null representation cannot use newline or carriage return")));
1349 
1350  /*
1351  * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1352  * backslash because it would be ambiguous. We can't allow the other
1353  * cases because data characters matching the delimiter must be
1354  * backslashed, and certain backslash combinations are interpreted
1355  * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1356  * more than strictly necessary, but seems best for consistency and
1357  * future-proofing. Likewise we disallow all digits though only octal
1358  * digits are actually dangerous.
1359  */
1360  if (!cstate->csv_mode &&
1361  strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1362  cstate->delim[0]) != NULL)
1363  ereport(ERROR,
1364  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1365  errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1366 
1367  /* Check header */
1368  if (!cstate->csv_mode && cstate->header_line)
1369  ereport(ERROR,
1370  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1371  errmsg("COPY HEADER available only in CSV mode")));
1372 
1373  /* Check quote */
1374  if (!cstate->csv_mode && cstate->quote != NULL)
1375  ereport(ERROR,
1376  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1377  errmsg("COPY quote available only in CSV mode")));
1378 
1379  if (cstate->csv_mode && strlen(cstate->quote) != 1)
1380  ereport(ERROR,
1381  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1382  errmsg("COPY quote must be a single one-byte character")));
1383 
1384  if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1385  ereport(ERROR,
1386  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1387  errmsg("COPY delimiter and quote must be different")));
1388 
1389  /* Check escape */
1390  if (!cstate->csv_mode && cstate->escape != NULL)
1391  ereport(ERROR,
1392  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1393  errmsg("COPY escape available only in CSV mode")));
1394 
1395  if (cstate->csv_mode && strlen(cstate->escape) != 1)
1396  ereport(ERROR,
1397  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1398  errmsg("COPY escape must be a single one-byte character")));
1399 
1400  /* Check force_quote */
1401  if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1402  ereport(ERROR,
1403  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1404  errmsg("COPY force quote available only in CSV mode")));
1405  if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1406  ereport(ERROR,
1407  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1408  errmsg("COPY force quote only available using COPY TO")));
1409 
1410  /* Check force_notnull */
1411  if (!cstate->csv_mode && cstate->force_notnull != NIL)
1412  ereport(ERROR,
1413  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1414  errmsg("COPY force not null available only in CSV mode")));
1415  if (cstate->force_notnull != NIL && !is_from)
1416  ereport(ERROR,
1417  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1418  errmsg("COPY force not null only available using COPY FROM")));
1419 
1420  /* Check force_null */
1421  if (!cstate->csv_mode && cstate->force_null != NIL)
1422  ereport(ERROR,
1423  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1424  errmsg("COPY force null available only in CSV mode")));
1425 
1426  if (cstate->force_null != NIL && !is_from)
1427  ereport(ERROR,
1428  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1429  errmsg("COPY force null only available using COPY FROM")));
1430 
1431  /* Don't allow the delimiter to appear in the null string. */
1432  if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1433  ereport(ERROR,
1434  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1435  errmsg("COPY delimiter must not appear in the NULL specification")));
1436 
1437  /* Don't allow the CSV quote char to appear in the null string. */
1438  if (cstate->csv_mode &&
1439  strchr(cstate->null_print, cstate->quote[0]) != NULL)
1440  ereport(ERROR,
1441  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1442  errmsg("CSV quote character must not appear in the NULL specification")));
1443 }
1444 
1445 /*
1446  * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1447  *
1448  * Iff <binary>, unload or reload in the binary format, as opposed to the
1449  * more wasteful but more robust and portable text format.
1450  *
1451  * Iff <oids>, unload or reload the format that includes OID information.
1452  * On input, we accept OIDs whether or not the table has an OID column,
1453  * but silently drop them if it does not. On output, we report an error
1454  * if the user asks for OIDs in a table that has none (not providing an
1455  * OID column might seem friendlier, but could seriously confuse programs).
1456  *
1457  * If in the text format, delimit columns with delimiter <delim> and print
1458  * NULL values as <null_print>.
1459  */
1460 static CopyState
1462  bool is_from,
1463  Relation rel,
1464  RawStmt *raw_query,
1465  Oid queryRelId,
1466  List *attnamelist,
1467  List *options)
1468 {
1469  CopyState cstate;
1470  TupleDesc tupDesc;
1471  int num_phys_attrs;
1472  MemoryContext oldcontext;
1473 
1474  /* Allocate workspace and zero all fields */
1475  cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1476 
1477  /*
1478  * We allocate everything used by a cstate in a new memory context. This
1479  * avoids memory leaks during repeated use of COPY in a query.
1480  */
1482  "COPY",
1484 
1485  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1486 
1487  /* Extract options from the statement node tree */
1488  ProcessCopyOptions(pstate, cstate, is_from, options);
1489 
1490  /* Process the source/target relation or query */
1491  if (rel)
1492  {
1493  Assert(!raw_query);
1494 
1495  cstate->rel = rel;
1496 
1497  tupDesc = RelationGetDescr(cstate->rel);
1498  }
1499  else
1500  {
1501  List *rewritten;
1502  Query *query;
1503  PlannedStmt *plan;
1504  DestReceiver *dest;
1505 
1506  Assert(!is_from);
1507  cstate->rel = NULL;
1508 
1509  /*
1510  * Run parse analysis and rewrite. Note this also acquires sufficient
1511  * locks on the source table(s).
1512  *
1513  * Because the parser and planner tend to scribble on their input, we
1514  * make a preliminary copy of the source querytree. This prevents
1515  * problems in the case that the COPY is in a portal or plpgsql
1516  * function and is executed repeatedly. (See also the same hack in
1517  * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1518  */
1519  rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1520  pstate->p_sourcetext, NULL, 0,
1521  NULL);
1522 
1523  /* check that we got back something we can work with */
1524  if (rewritten == NIL)
1525  {
1526  ereport(ERROR,
1527  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1528  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1529  }
1530  else if (list_length(rewritten) > 1)
1531  {
1532  ListCell *lc;
1533 
1534  /* examine queries to determine which error message to issue */
1535  foreach(lc, rewritten)
1536  {
1537  Query *q = lfirst_node(Query, lc);
1538 
1540  ereport(ERROR,
1541  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1542  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1544  ereport(ERROR,
1545  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1546  errmsg("DO ALSO rules are not supported for the COPY")));
1547  }
1548 
1549  ereport(ERROR,
1550  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1551  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1552  }
1553 
1554  query = linitial_node(Query, rewritten);
1555 
1556  /* The grammar allows SELECT INTO, but we don't support that */
1557  if (query->utilityStmt != NULL &&
1559  ereport(ERROR,
1560  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1561  errmsg("COPY (SELECT INTO) is not supported")));
1562 
1563  Assert(query->utilityStmt == NULL);
1564 
1565  /*
1566  * Similarly the grammar doesn't enforce the presence of a RETURNING
1567  * clause, but this is required here.
1568  */
1569  if (query->commandType != CMD_SELECT &&
1570  query->returningList == NIL)
1571  {
1572  Assert(query->commandType == CMD_INSERT ||
1573  query->commandType == CMD_UPDATE ||
1574  query->commandType == CMD_DELETE);
1575 
1576  ereport(ERROR,
1577  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1578  errmsg("COPY query must have a RETURNING clause")));
1579  }
1580 
1581  /* plan the query */
1582  plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1583 
1584  /*
1585  * With row level security and a user using "COPY relation TO", we
1586  * have to convert the "COPY relation TO" to a query-based COPY (eg:
1587  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1588  * in any RLS clauses.
1589  *
1590  * When this happens, we are passed in the relid of the originally
1591  * found relation (which we have locked). As the planner will look up
1592  * the relation again, we double-check here to make sure it found the
1593  * same one that we have locked.
1594  */
1595  if (queryRelId != InvalidOid)
1596  {
1597  /*
1598  * Note that with RLS involved there may be multiple relations,
1599  * and while the one we need is almost certainly first, we don't
1600  * make any guarantees of that in the planner, so check the whole
1601  * list and make sure we find the original relation.
1602  */
1603  if (!list_member_oid(plan->relationOids, queryRelId))
1604  ereport(ERROR,
1605  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1606  errmsg("relation referenced by COPY statement has changed")));
1607  }
1608 
1609  /*
1610  * Use a snapshot with an updated command ID to ensure this query sees
1611  * results of any previously executed queries.
1612  */
1615 
1616  /* Create dest receiver for COPY OUT */
1618  ((DR_copy *) dest)->cstate = cstate;
1619 
1620  /* Create a QueryDesc requesting no output */
1621  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1624  dest, NULL, NULL, 0);
1625 
1626  /*
1627  * Call ExecutorStart to prepare the plan for execution.
1628  *
1629  * ExecutorStart computes a result tupdesc for us
1630  */
1631  ExecutorStart(cstate->queryDesc, 0);
1632 
1633  tupDesc = cstate->queryDesc->tupDesc;
1634  }
1635 
1636  /* Generate or convert list of attributes to process */
1637  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1638 
1639  num_phys_attrs = tupDesc->natts;
1640 
1641  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1642  cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1643  if (cstate->force_quote_all)
1644  {
1645  int i;
1646 
1647  for (i = 0; i < num_phys_attrs; i++)
1648  cstate->force_quote_flags[i] = true;
1649  }
1650  else if (cstate->force_quote)
1651  {
1652  List *attnums;
1653  ListCell *cur;
1654 
1655  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1656 
1657  foreach(cur, attnums)
1658  {
1659  int attnum = lfirst_int(cur);
1660  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1661 
1662  if (!list_member_int(cstate->attnumlist, attnum))
1663  ereport(ERROR,
1664  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1665  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1666  NameStr(attr->attname))));
1667  cstate->force_quote_flags[attnum - 1] = true;
1668  }
1669  }
1670 
1671  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1672  cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1673  if (cstate->force_notnull)
1674  {
1675  List *attnums;
1676  ListCell *cur;
1677 
1678  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1679 
1680  foreach(cur, attnums)
1681  {
1682  int attnum = lfirst_int(cur);
1683  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1684 
1685  if (!list_member_int(cstate->attnumlist, attnum))
1686  ereport(ERROR,
1687  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1688  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1689  NameStr(attr->attname))));
1690  cstate->force_notnull_flags[attnum - 1] = true;
1691  }
1692  }
1693 
1694  /* Convert FORCE_NULL name list to per-column flags, check validity */
1695  cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1696  if (cstate->force_null)
1697  {
1698  List *attnums;
1699  ListCell *cur;
1700 
1701  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1702 
1703  foreach(cur, attnums)
1704  {
1705  int attnum = lfirst_int(cur);
1706  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1707 
1708  if (!list_member_int(cstate->attnumlist, attnum))
1709  ereport(ERROR,
1710  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1711  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1712  NameStr(attr->attname))));
1713  cstate->force_null_flags[attnum - 1] = true;
1714  }
1715  }
1716 
1717  /* Convert convert_selectively name list to per-column flags */
1718  if (cstate->convert_selectively)
1719  {
1720  List *attnums;
1721  ListCell *cur;
1722 
1723  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1724 
1725  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1726 
1727  foreach(cur, attnums)
1728  {
1729  int attnum = lfirst_int(cur);
1730  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1731 
1732  if (!list_member_int(cstate->attnumlist, attnum))
1733  ereport(ERROR,
1734  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1735  errmsg_internal("selected column \"%s\" not referenced by COPY",
1736  NameStr(attr->attname))));
1737  cstate->convert_select_flags[attnum - 1] = true;
1738  }
1739  }
1740 
1741  /* Use client encoding when ENCODING option is not specified. */
1742  if (cstate->file_encoding < 0)
1744 
1745  /*
1746  * Set up encoding conversion info. Even if the file and server encodings
1747  * are the same, we must apply pg_any_to_server() to validate data in
1748  * multibyte encodings.
1749  */
1750  cstate->need_transcoding =
1751  (cstate->file_encoding != GetDatabaseEncoding() ||
1753  /* See Multibyte encoding comment above */
1755 
1756  cstate->copy_dest = COPY_FILE; /* default */
1757 
1758  MemoryContextSwitchTo(oldcontext);
1759 
1760  return cstate;
1761 }
1762 
1763 /*
1764  * Closes the pipe to an external program, checking the pclose() return code.
1765  */
1766 static void
1768 {
1769  int pclose_rc;
1770 
1771  Assert(cstate->is_program);
1772 
1773  pclose_rc = ClosePipeStream(cstate->copy_file);
1774  if (pclose_rc == -1)
1775  ereport(ERROR,
1777  errmsg("could not close pipe to external command: %m")));
1778  else if (pclose_rc != 0)
1779  {
1780  /*
1781  * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1782  * expectable for the called program to fail with SIGPIPE, and we
1783  * should not report that as an error. Otherwise, SIGPIPE indicates a
1784  * problem.
1785  */
1786  if (cstate->is_copy_from && !cstate->reached_eof &&
1787  wait_result_is_signal(pclose_rc, SIGPIPE))
1788  return;
1789 
1790  ereport(ERROR,
1791  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1792  errmsg("program \"%s\" failed",
1793  cstate->filename),
1794  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1795  }
1796 }
1797 
1798 /*
1799  * Release resources allocated in a cstate for COPY TO/FROM.
1800  */
1801 static void
1803 {
1804  if (cstate->is_program)
1805  {
1806  ClosePipeToProgram(cstate);
1807  }
1808  else
1809  {
1810  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1811  ereport(ERROR,
1813  errmsg("could not close file \"%s\": %m",
1814  cstate->filename)));
1815  }
1816 
1818  pfree(cstate);
1819 }
1820 
1821 /*
1822  * Setup CopyState to read tuples from a table or a query for COPY TO.
1823  */
1824 static CopyState
1826  Relation rel,
1827  RawStmt *query,
1828  Oid queryRelId,
1829  const char *filename,
1830  bool is_program,
1831  List *attnamelist,
1832  List *options)
1833 {
1834  CopyState cstate;
1835  bool pipe = (filename == NULL);
1836  MemoryContext oldcontext;
1837 
1838  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1839  {
1840  if (rel->rd_rel->relkind == RELKIND_VIEW)
1841  ereport(ERROR,
1842  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1843  errmsg("cannot copy from view \"%s\"",
1845  errhint("Try the COPY (SELECT ...) TO variant.")));
1846  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1847  ereport(ERROR,
1848  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1849  errmsg("cannot copy from materialized view \"%s\"",
1851  errhint("Try the COPY (SELECT ...) TO variant.")));
1852  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1853  ereport(ERROR,
1854  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1855  errmsg("cannot copy from foreign table \"%s\"",
1857  errhint("Try the COPY (SELECT ...) TO variant.")));
1858  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1859  ereport(ERROR,
1860  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1861  errmsg("cannot copy from sequence \"%s\"",
1862  RelationGetRelationName(rel))));
1863  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1864  ereport(ERROR,
1865  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1866  errmsg("cannot copy from partitioned table \"%s\"",
1868  errhint("Try the COPY (SELECT ...) TO variant.")));
1869  else
1870  ereport(ERROR,
1871  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1872  errmsg("cannot copy from non-table relation \"%s\"",
1873  RelationGetRelationName(rel))));
1874  }
1875 
1876  cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1877  options);
1878  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1879 
1880  if (pipe)
1881  {
1882  Assert(!is_program); /* the grammar does not allow this */
1884  cstate->copy_file = stdout;
1885  }
1886  else
1887  {
1888  cstate->filename = pstrdup(filename);
1889  cstate->is_program = is_program;
1890 
1891  if (is_program)
1892  {
1893  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1894  if (cstate->copy_file == NULL)
1895  ereport(ERROR,
1897  errmsg("could not execute command \"%s\": %m",
1898  cstate->filename)));
1899  }
1900  else
1901  {
1902  mode_t oumask; /* Pre-existing umask value */
1903  struct stat st;
1904 
1905  /*
1906  * Prevent write to relative path ... too easy to shoot oneself in
1907  * the foot by overwriting a database file ...
1908  */
1909  if (!is_absolute_path(filename))
1910  ereport(ERROR,
1911  (errcode(ERRCODE_INVALID_NAME),
1912  errmsg("relative path not allowed for COPY to file")));
1913 
1914  oumask = umask(S_IWGRP | S_IWOTH);
1915  PG_TRY();
1916  {
1917  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1918  }
1919  PG_CATCH();
1920  {
1921  umask(oumask);
1922  PG_RE_THROW();
1923  }
1924  PG_END_TRY();
1925  umask(oumask);
1926  if (cstate->copy_file == NULL)
1927  {
1928  /* copy errno because ereport subfunctions might change it */
1929  int save_errno = errno;
1930 
1931  ereport(ERROR,
1933  errmsg("could not open file \"%s\" for writing: %m",
1934  cstate->filename),
1935  (save_errno == ENOENT || save_errno == EACCES) ?
1936  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1937  "You may want a client-side facility such as psql's \\copy.") : 0));
1938  }
1939 
1940  if (fstat(fileno(cstate->copy_file), &st))
1941  ereport(ERROR,
1943  errmsg("could not stat file \"%s\": %m",
1944  cstate->filename)));
1945 
1946  if (S_ISDIR(st.st_mode))
1947  ereport(ERROR,
1948  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1949  errmsg("\"%s\" is a directory", cstate->filename)));
1950  }
1951  }
1952 
1953  MemoryContextSwitchTo(oldcontext);
1954 
1955  return cstate;
1956 }
1957 
1958 /*
1959  * This intermediate routine exists mainly to localize the effects of setjmp
1960  * so we don't need to plaster a lot of variables with "volatile".
1961  */
1962 static uint64
1964 {
1965  bool pipe = (cstate->filename == NULL);
1966  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1967  uint64 processed;
1968 
1969  PG_TRY();
1970  {
1971  if (fe_copy)
1972  SendCopyBegin(cstate);
1973 
1974  processed = CopyTo(cstate);
1975 
1976  if (fe_copy)
1977  SendCopyEnd(cstate);
1978  }
1979  PG_CATCH();
1980  {
1981  /*
1982  * Make sure we turn off old-style COPY OUT mode upon error. It is
1983  * okay to do this in all cases, since it does nothing if the mode is
1984  * not on.
1985  */
1986  pq_endcopyout(true);
1987  PG_RE_THROW();
1988  }
1989  PG_END_TRY();
1990 
1991  return processed;
1992 }
1993 
1994 /*
1995  * Clean up storage and release resources for COPY TO.
1996  */
1997 static void
1999 {
2000  if (cstate->queryDesc != NULL)
2001  {
2002  /* Close down the query and free resources. */
2003  ExecutorFinish(cstate->queryDesc);
2004  ExecutorEnd(cstate->queryDesc);
2005  FreeQueryDesc(cstate->queryDesc);
2007  }
2008 
2009  /* Clean up storage */
2010  EndCopy(cstate);
2011 }
2012 
2013 /*
2014  * Copy from relation or query TO file.
2015  */
2016 static uint64
2018 {
2019  TupleDesc tupDesc;
2020  int num_phys_attrs;
2021  ListCell *cur;
2022  uint64 processed;
2023 
2024  if (cstate->rel)
2025  tupDesc = RelationGetDescr(cstate->rel);
2026  else
2027  tupDesc = cstate->queryDesc->tupDesc;
2028  num_phys_attrs = tupDesc->natts;
2029  cstate->null_print_client = cstate->null_print; /* default */
2030 
2031  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
2032  cstate->fe_msgbuf = makeStringInfo();
2033 
2034  /* Get info about the columns we need to process. */
2035  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
2036  foreach(cur, cstate->attnumlist)
2037  {
2038  int attnum = lfirst_int(cur);
2039  Oid out_func_oid;
2040  bool isvarlena;
2041  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
2042 
2043  if (cstate->binary)
2044  getTypeBinaryOutputInfo(attr->atttypid,
2045  &out_func_oid,
2046  &isvarlena);
2047  else
2048  getTypeOutputInfo(attr->atttypid,
2049  &out_func_oid,
2050  &isvarlena);
2051  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
2052  }
2053 
2054  /*
2055  * Create a temporary memory context that we can reset once per row to
2056  * recover palloc'd memory. This avoids any problems with leaks inside
2057  * datatype output routines, and should be faster than retail pfree's
2058  * anyway. (We don't need a whole econtext as CopyFrom does.)
2059  */
2061  "COPY TO",
2063 
2064  if (cstate->binary)
2065  {
2066  /* Generate header for a binary copy */
2067  int32 tmp;
2068 
2069  /* Signature */
2070  CopySendData(cstate, BinarySignature, 11);
2071  /* Flags field */
2072  tmp = 0;
2073  CopySendInt32(cstate, tmp);
2074  /* No header extension */
2075  tmp = 0;
2076  CopySendInt32(cstate, tmp);
2077  }
2078  else
2079  {
2080  /*
2081  * For non-binary copy, we need to convert null_print to file
2082  * encoding, because it will be sent directly with CopySendString.
2083  */
2084  if (cstate->need_transcoding)
2085  cstate->null_print_client = pg_server_to_any(cstate->null_print,
2086  cstate->null_print_len,
2087  cstate->file_encoding);
2088 
2089  /* if a header has been requested send the line */
2090  if (cstate->header_line)
2091  {
2092  bool hdr_delim = false;
2093 
2094  foreach(cur, cstate->attnumlist)
2095  {
2096  int attnum = lfirst_int(cur);
2097  char *colname;
2098 
2099  if (hdr_delim)
2100  CopySendChar(cstate, cstate->delim[0]);
2101  hdr_delim = true;
2102 
2103  colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2104 
2105  CopyAttributeOutCSV(cstate, colname, false,
2106  list_length(cstate->attnumlist) == 1);
2107  }
2108 
2109  CopySendEndOfRow(cstate);
2110  }
2111  }
2112 
2113  if (cstate->rel)
2114  {
2115  TupleTableSlot *slot;
2116  TableScanDesc scandesc;
2117 
2118  scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2119  slot = table_slot_create(cstate->rel, NULL);
2120 
2121  processed = 0;
2122  while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
2123  {
2125 
2126  /* Deconstruct the tuple ... */
2127  slot_getallattrs(slot);
2128 
2129  /* Format and send the data */
2130  CopyOneRowTo(cstate, slot);
2131  processed++;
2132  }
2133 
2135  table_endscan(scandesc);
2136  }
2137  else
2138  {
2139  /* run the plan --- the dest receiver will send tuples */
2140  ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2141  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2142  }
2143 
2144  if (cstate->binary)
2145  {
2146  /* Generate trailer for a binary copy */
2147  CopySendInt16(cstate, -1);
2148  /* Need to flush out the trailer */
2149  CopySendEndOfRow(cstate);
2150  }
2151 
2153 
2154  return processed;
2155 }
2156 
2157 /*
2158  * Emit one row during CopyTo().
2159  */
2160 static void
2162 {
2163  bool need_delim = false;
2165  MemoryContext oldcontext;
2166  ListCell *cur;
2167  char *string;
2168 
2169  MemoryContextReset(cstate->rowcontext);
2170  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2171 
2172  if (cstate->binary)
2173  {
2174  /* Binary per-tuple header */
2175  CopySendInt16(cstate, list_length(cstate->attnumlist));
2176  }
2177 
2178  /* Make sure the tuple is fully deconstructed */
2179  slot_getallattrs(slot);
2180 
2181  foreach(cur, cstate->attnumlist)
2182  {
2183  int attnum = lfirst_int(cur);
2184  Datum value = slot->tts_values[attnum - 1];
2185  bool isnull = slot->tts_isnull[attnum - 1];
2186 
2187  if (!cstate->binary)
2188  {
2189  if (need_delim)
2190  CopySendChar(cstate, cstate->delim[0]);
2191  need_delim = true;
2192  }
2193 
2194  if (isnull)
2195  {
2196  if (!cstate->binary)
2197  CopySendString(cstate, cstate->null_print_client);
2198  else
2199  CopySendInt32(cstate, -1);
2200  }
2201  else
2202  {
2203  if (!cstate->binary)
2204  {
2205  string = OutputFunctionCall(&out_functions[attnum - 1],
2206  value);
2207  if (cstate->csv_mode)
2208  CopyAttributeOutCSV(cstate, string,
2209  cstate->force_quote_flags[attnum - 1],
2210  list_length(cstate->attnumlist) == 1);
2211  else
2212  CopyAttributeOutText(cstate, string);
2213  }
2214  else
2215  {
2216  bytea *outputbytes;
2217 
2218  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2219  value);
2220  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2221  CopySendData(cstate, VARDATA(outputbytes),
2222  VARSIZE(outputbytes) - VARHDRSZ);
2223  }
2224  }
2225  }
2226 
2227  CopySendEndOfRow(cstate);
2228 
2229  MemoryContextSwitchTo(oldcontext);
2230 }
2231 
2232 
2233 /*
2234  * error context callback for COPY FROM
2235  *
2236  * The argument for the error context must be CopyState.
2237  */
2238 void
2240 {
2241  CopyState cstate = (CopyState) arg;
2242  char curlineno_str[32];
2243 
2244  snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
2245  cstate->cur_lineno);
2246 
2247  if (cstate->binary)
2248  {
2249  /* can't usefully display the data */
2250  if (cstate->cur_attname)
2251  errcontext("COPY %s, line %s, column %s",
2252  cstate->cur_relname, curlineno_str,
2253  cstate->cur_attname);
2254  else
2255  errcontext("COPY %s, line %s",
2256  cstate->cur_relname, curlineno_str);
2257  }
2258  else
2259  {
2260  if (cstate->cur_attname && cstate->cur_attval)
2261  {
2262  /* error is relevant to a particular column */
2263  char *attval;
2264 
2265  attval = limit_printout_length(cstate->cur_attval);
2266  errcontext("COPY %s, line %s, column %s: \"%s\"",
2267  cstate->cur_relname, curlineno_str,
2268  cstate->cur_attname, attval);
2269  pfree(attval);
2270  }
2271  else if (cstate->cur_attname)
2272  {
2273  /* error is relevant to a particular column, value is NULL */
2274  errcontext("COPY %s, line %s, column %s: null input",
2275  cstate->cur_relname, curlineno_str,
2276  cstate->cur_attname);
2277  }
2278  else
2279  {
2280  /*
2281  * Error is relevant to a particular line.
2282  *
2283  * If line_buf still contains the correct line, and it's already
2284  * transcoded, print it. If it's still in a foreign encoding, it's
2285  * quite likely that the error is precisely a failure to do
2286  * encoding conversion (ie, bad data). We dare not try to convert
2287  * it, and at present there's no way to regurgitate it without
2288  * conversion. So we have to punt and just report the line number.
2289  */
2290  if (cstate->line_buf_valid &&
2291  (cstate->line_buf_converted || !cstate->need_transcoding))
2292  {
2293  char *lineval;
2294 
2295  lineval = limit_printout_length(cstate->line_buf.data);
2296  errcontext("COPY %s, line %s: \"%s\"",
2297  cstate->cur_relname, curlineno_str, lineval);
2298  pfree(lineval);
2299  }
2300  else
2301  {
2302  errcontext("COPY %s, line %s",
2303  cstate->cur_relname, curlineno_str);
2304  }
2305  }
2306  }
2307 }
2308 
2309 /*
2310  * Make sure we don't print an unreasonable amount of COPY data in a message.
2311  *
2312  * It would seem a lot easier to just use the sprintf "precision" limit to
2313  * truncate the string. However, some versions of glibc have a bug/misfeature
2314  * that vsnprintf will always fail (return -1) if it is asked to truncate
2315  * a string that contains invalid byte sequences for the current encoding.
2316  * So, do our own truncation. We return a pstrdup'd copy of the input.
2317  */
2318 static char *
2320 {
2321 #define MAX_COPY_DATA_DISPLAY 100
2322 
2323  int slen = strlen(str);
2324  int len;
2325  char *res;
2326 
2327  /* Fast path if definitely okay */
2328  if (slen <= MAX_COPY_DATA_DISPLAY)
2329  return pstrdup(str);
2330 
2331  /* Apply encoding-dependent truncation */
2332  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2333 
2334  /*
2335  * Truncate, and add "..." to show we truncated the input.
2336  */
2337  res = (char *) palloc(len + 4);
2338  memcpy(res, str, len);
2339  strcpy(res + len, "...");
2340 
2341  return res;
2342 }
2343 
2344 /*
2345  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
2346  * ResultRelInfo.
2347  */
2348 static CopyMultiInsertBuffer *
2350 {
2351  CopyMultiInsertBuffer *buffer;
2352 
2353  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
2354  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
2355  buffer->resultRelInfo = rri;
2356  buffer->bistate = GetBulkInsertState();
2357  buffer->nused = 0;
2358 
2359  return buffer;
2360 }
2361 
2362 /*
2363  * Make a new buffer for this ResultRelInfo.
2364  */
2365 static inline void
2367  ResultRelInfo *rri)
2368 {
2369  CopyMultiInsertBuffer *buffer;
2370 
2371  buffer = CopyMultiInsertBufferInit(rri);
2372 
2373  /* Setup back-link so we can easily find this buffer again */
2374  rri->ri_CopyMultiInsertBuffer = buffer;
2375  /* Record that we're tracking this buffer */
2376  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2377 }
2378 
2379 /*
2380  * Initialize an already allocated CopyMultiInsertInfo.
2381  *
2382  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
2383  * for that table.
2384  */
2385 static void
2387  CopyState cstate, EState *estate, CommandId mycid,
2388  int ti_options)
2389 {
2390  miinfo->multiInsertBuffers = NIL;
2391  miinfo->bufferedTuples = 0;
2392  miinfo->bufferedBytes = 0;
2393  miinfo->cstate = cstate;
2394  miinfo->estate = estate;
2395  miinfo->mycid = mycid;
2396  miinfo->ti_options = ti_options;
2397 
2398  /*
2399  * Only setup the buffer when not dealing with a partitioned table.
2400  * Buffers for partitioned tables will just be setup when we need to send
2401  * tuples their way for the first time.
2402  */
2403  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
2404  CopyMultiInsertInfoSetupBuffer(miinfo, rri);
2405 }
2406 
2407 /*
2408  * Returns true if the buffers are full
2409  */
2410 static inline bool
2412 {
2413  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
2414  miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
2415  return true;
2416  return false;
2417 }
2418 
2419 /*
2420  * Returns true if we have no buffered tuples
2421  */
2422 static inline bool
2424 {
2425  return miinfo->bufferedTuples == 0;
2426 }
2427 
2428 /*
2429  * Write the tuples stored in 'buffer' out to the table.
2430  */
2431 static inline void
2433  CopyMultiInsertBuffer *buffer)
2434 {
2435  MemoryContext oldcontext;
2436  int i;
2437  uint64 save_cur_lineno;
2438  CopyState cstate = miinfo->cstate;
2439  EState *estate = miinfo->estate;
2440  CommandId mycid = miinfo->mycid;
2441  int ti_options = miinfo->ti_options;
2442  bool line_buf_valid = cstate->line_buf_valid;
2443  int nused = buffer->nused;
2444  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
2445  TupleTableSlot **slots = buffer->slots;
2446 
2447  /* Set es_result_relation_info to the ResultRelInfo we're flushing. */
2448  estate->es_result_relation_info = resultRelInfo;
2449 
2450  /*
2451  * Print error context information correctly, if one of the operations
2452  * below fail.
2453  */
2454  cstate->line_buf_valid = false;
2455  save_cur_lineno = cstate->cur_lineno;
2456 
2457  /*
2458  * table_multi_insert may leak memory, so switch to short-lived memory
2459  * context before calling it.
2460  */
2461  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2462  table_multi_insert(resultRelInfo->ri_RelationDesc,
2463  slots,
2464  nused,
2465  mycid,
2466  ti_options,
2467  buffer->bistate);
2468  MemoryContextSwitchTo(oldcontext);
2469 
2470  for (i = 0; i < nused; i++)
2471  {
2472  /*
2473  * If there are any indexes, update them for all the inserted tuples,
2474  * and run AFTER ROW INSERT triggers.
2475  */
2476  if (resultRelInfo->ri_NumIndices > 0)
2477  {
2478  List *recheckIndexes;
2479 
2480  cstate->cur_lineno = buffer->linenos[i];
2481  recheckIndexes =
2482  ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
2483  NIL);
2484  ExecARInsertTriggers(estate, resultRelInfo,
2485  slots[i], recheckIndexes,
2486  cstate->transition_capture);
2487  list_free(recheckIndexes);
2488  }
2489 
2490  /*
2491  * There's no indexes, but see if we need to run AFTER ROW INSERT
2492  * triggers anyway.
2493  */
2494  else if (resultRelInfo->ri_TrigDesc != NULL &&
2495  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2496  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2497  {
2498  cstate->cur_lineno = buffer->linenos[i];
2499  ExecARInsertTriggers(estate, resultRelInfo,
2500  slots[i], NIL, cstate->transition_capture);
2501  }
2502 
2503  ExecClearTuple(slots[i]);
2504  }
2505 
2506  /* Mark that all slots are free */
2507  buffer->nused = 0;
2508 
2509  /* reset cur_lineno and line_buf_valid to what they were */
2510  cstate->line_buf_valid = line_buf_valid;
2511  cstate->cur_lineno = save_cur_lineno;
2512 }
2513 
2514 /*
2515  * Drop used slots and free member for this buffer.
2516  *
2517  * The buffer must be flushed before cleanup.
2518  */
2519 static inline void
2521  CopyMultiInsertBuffer *buffer)
2522 {
2523  int i;
2524 
2525  /* Ensure buffer was flushed */
2526  Assert(buffer->nused == 0);
2527 
2528  /* Remove back-link to ourself */
2529  buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
2530 
2531  FreeBulkInsertState(buffer->bistate);
2532 
2533  /* Since we only create slots on demand, just drop the non-null ones. */
2534  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
2535  ExecDropSingleTupleTableSlot(buffer->slots[i]);
2536 
2538  miinfo->ti_options);
2539 
2540  pfree(buffer);
2541 }
2542 
2543 /*
2544  * Write out all stored tuples in all buffers out to the tables.
2545  *
2546  * Once flushed we also trim the tracked buffers list down to size by removing
2547  * the buffers created earliest first.
2548  *
2549  * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
2550  * used. When cleaning up old buffers we'll never remove the one for
2551  * 'curr_rri'.
2552  */
2553 static inline void
2555 {
2556  ListCell *lc;
2557 
2558  foreach(lc, miinfo->multiInsertBuffers)
2559  {
2561 
2562  CopyMultiInsertBufferFlush(miinfo, buffer);
2563  }
2564 
2565  miinfo->bufferedTuples = 0;
2566  miinfo->bufferedBytes = 0;
2567 
2568  /*
2569  * Trim the list of tracked buffers down if it exceeds the limit. Here we
2570  * remove buffers starting with the ones we created first. It seems more
2571  * likely that these older ones are less likely to be needed than ones
2572  * that were just created.
2573  */
2575  {
2576  CopyMultiInsertBuffer *buffer;
2577 
2578  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2579 
2580  /*
2581  * We never want to remove the buffer that's currently being used, so
2582  * if we happen to find that then move it to the end of the list.
2583  */
2584  if (buffer->resultRelInfo == curr_rri)
2585  {
2587  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
2588  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
2589  }
2590 
2591  CopyMultiInsertBufferCleanup(miinfo, buffer);
2593  }
2594 }
2595 
2596 /*
2597  * Cleanup allocated buffers and free memory
2598  */
2599 static inline void
2601 {
2602  ListCell *lc;
2603 
2604  foreach(lc, miinfo->multiInsertBuffers)
2605  CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
2606 
2607  list_free(miinfo->multiInsertBuffers);
2608 }
2609 
2610 /*
2611  * Get the next TupleTableSlot that the next tuple should be stored in.
2612  *
2613  * Callers must ensure that the buffer is not full.
2614  */
2615 static inline TupleTableSlot *
2617  ResultRelInfo *rri)
2618 {
2620  int nused = buffer->nused;
2621 
2622  Assert(buffer != NULL);
2623  Assert(nused < MAX_BUFFERED_TUPLES);
2624 
2625  if (buffer->slots[nused] == NULL)
2626  buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
2627  return buffer->slots[nused];
2628 }
2629 
2630 /*
2631  * Record the previously reserved TupleTableSlot that was reserved by
2632  * CopyMultiInsertInfoNextFreeSlot as being consumed.
2633  */
2634 static inline void
2636  TupleTableSlot *slot, int tuplen, uint64 lineno)
2637 {
2639 
2640  Assert(buffer != NULL);
2641  Assert(slot == buffer->slots[buffer->nused]);
2642 
2643  /* Store the line number so we can properly report any errors later */
2644  buffer->linenos[buffer->nused] = lineno;
2645 
2646  /* Record this slot as being used */
2647  buffer->nused++;
2648 
2649  /* Update how many tuples are stored and their size */
2650  miinfo->bufferedTuples++;
2651  miinfo->bufferedBytes += tuplen;
2652 }
2653 
2654 /*
2655  * Copy FROM file to relation.
2656  */
2657 uint64
2659 {
2660  ResultRelInfo *resultRelInfo;
2661  ResultRelInfo *target_resultRelInfo;
2662  ResultRelInfo *prevResultRelInfo = NULL;
2663  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2664  ModifyTableState *mtstate;
2665  ExprContext *econtext;
2666  TupleTableSlot *singleslot = NULL;
2667  MemoryContext oldcontext = CurrentMemoryContext;
2668 
2669  PartitionTupleRouting *proute = NULL;
2670  ErrorContextCallback errcallback;
2671  CommandId mycid = GetCurrentCommandId(true);
2672  int ti_options = 0; /* start with default options for insert */
2673  BulkInsertState bistate = NULL;
2674  CopyInsertMethod insertMethod;
2675  CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
2676  uint64 processed = 0;
2677  bool has_before_insert_row_trig;
2678  bool has_instead_insert_row_trig;
2679  bool leafpart_use_multi_insert = false;
2680 
2681  Assert(cstate->rel);
2682 
2683  /*
2684  * The target must be a plain, foreign, or partitioned relation, or have
2685  * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
2686  * allowed on views, so we only hint about them in the view case.)
2687  */
2688  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2689  cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
2690  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2691  !(cstate->rel->trigdesc &&
2693  {
2694  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2695  ereport(ERROR,
2696  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2697  errmsg("cannot copy to view \"%s\"",
2698  RelationGetRelationName(cstate->rel)),
2699  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2700  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2701  ereport(ERROR,
2702  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2703  errmsg("cannot copy to materialized view \"%s\"",
2704  RelationGetRelationName(cstate->rel))));
2705  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2706  ereport(ERROR,
2707  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2708  errmsg("cannot copy to sequence \"%s\"",
2709  RelationGetRelationName(cstate->rel))));
2710  else
2711  ereport(ERROR,
2712  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2713  errmsg("cannot copy to non-table relation \"%s\"",
2714  RelationGetRelationName(cstate->rel))));
2715  }
2716 
2717  /*----------
2718  * Check to see if we can avoid writing WAL
2719  *
2720  * If archive logging/streaming is not enabled *and* either
2721  * - table was created in same transaction as this COPY
2722  * - data is being written to relfilenode created in this transaction
2723  * then we can skip writing WAL. It's safe because if the transaction
2724  * doesn't commit, we'll discard the table (or the new relfilenode file).
2725  * If it does commit, we'll have done the table_finish_bulk_insert() at
2726  * the bottom of this routine first.
2727  *
2728  * As mentioned in comments in utils/rel.h, the in-same-transaction test
2729  * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2730  * can be cleared before the end of the transaction. The exact case is
2731  * when a relation sets a new relfilenode twice in same transaction, yet
2732  * the second one fails in an aborted subtransaction, e.g.
2733  *
2734  * BEGIN;
2735  * TRUNCATE t;
2736  * SAVEPOINT save;
2737  * TRUNCATE t;
2738  * ROLLBACK TO save;
2739  * COPY ...
2740  *
2741  * Also, if the target file is new-in-transaction, we assume that checking
2742  * FSM for free space is a waste of time, even if we must use WAL because
2743  * of archiving. This could possibly be wrong, but it's unlikely.
2744  *
2745  * The comments for table_tuple_insert and RelationGetBufferForTuple
2746  * specify that skipping WAL logging is only safe if we ensure that our
2747  * tuples do not go into pages containing tuples from any other
2748  * transactions --- but this must be the case if we have a new table or
2749  * new relfilenode, so we need no additional work to enforce that.
2750  *
2751  * We currently don't support this optimization if the COPY target is a
2752  * partitioned table as we currently only lazily initialize partition
2753  * information when routing the first tuple to the partition. We cannot
2754  * know at this stage if we can perform this optimization. It should be
2755  * possible to improve on this, but it does mean maintaining heap insert
2756  * option flags per partition and setting them when we first open the
2757  * partition.
2758  *
2759  * This optimization is not supported for relation types which do not
2760  * have any physical storage, with foreign tables and views using
2761  * INSTEAD OF triggers entering in this category. Partitioned tables
2762  * are not supported as per the description above.
2763  *----------
2764  */
2765  /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2766  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
2769  {
2770  ti_options |= TABLE_INSERT_SKIP_FSM;
2771  if (!XLogIsNeeded())
2772  ti_options |= TABLE_INSERT_SKIP_WAL;
2773  }
2774 
2775  /*
2776  * Optimize if new relfilenode was created in this subxact or one of its
2777  * committed children and we won't see those rows later as part of an
2778  * earlier scan or command. The subxact test ensures that if this subxact
2779  * aborts then the frozen rows won't be visible after xact cleanup. Note
2780  * that the stronger test of exactly which subtransaction created it is
2781  * crucial for correctness of this optimization. The test for an earlier
2782  * scan or command tolerates false negatives. FREEZE causes other sessions
2783  * to see rows they would not see under MVCC, and a false negative merely
2784  * spreads that anomaly to the current session.
2785  */
2786  if (cstate->freeze)
2787  {
2788  /*
2789  * We currently disallow COPY FREEZE on partitioned tables. The
2790  * reason for this is that we've simply not yet opened the partitions
2791  * to determine if the optimization can be applied to them. We could
2792  * go and open them all here, but doing so may be quite a costly
2793  * overhead for small copies. In any case, we may just end up routing
2794  * tuples to a small number of partitions. It seems better just to
2795  * raise an ERROR for partitioned tables.
2796  */
2797  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2798  {
2799  ereport(ERROR,
2800  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2801  errmsg("cannot perform COPY FREEZE on a partitioned table")));
2802  }
2803 
2804  /*
2805  * Tolerate one registration for the benefit of FirstXactSnapshot.
2806  * Scan-bearing queries generally create at least two registrations,
2807  * though relying on that is fragile, as is ignoring ActiveSnapshot.
2808  * Clear CatalogSnapshot to avoid counting its registration. We'll
2809  * still detect ongoing catalog scans, each of which separately
2810  * registers the snapshot it uses.
2811  */
2814  ereport(ERROR,
2815  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2816  errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
2817 
2818  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2820  ereport(ERROR,
2821  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2822  errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
2823 
2824  ti_options |= TABLE_INSERT_FROZEN;
2825  }
2826 
2827  /*
2828  * We need a ResultRelInfo so we can use the regular executor's
2829  * index-entry-making machinery. (There used to be a huge amount of code
2830  * here that basically duplicated execUtils.c ...)
2831  */
2832  resultRelInfo = makeNode(ResultRelInfo);
2833  InitResultRelInfo(resultRelInfo,
2834  cstate->rel,
2835  1, /* must match rel's position in range_table */
2836  NULL,
2837  0);
2838  target_resultRelInfo = resultRelInfo;
2839 
2840  /* Verify the named relation is a valid target for INSERT */
2841  CheckValidResultRel(resultRelInfo, CMD_INSERT);
2842 
2843  ExecOpenIndices(resultRelInfo, false);
2844 
2845  estate->es_result_relations = resultRelInfo;
2846  estate->es_num_result_relations = 1;
2847  estate->es_result_relation_info = resultRelInfo;
2848 
2849  ExecInitRangeTable(estate, cstate->range_table);
2850 
2851  /*
2852  * Set up a ModifyTableState so we can let FDW(s) init themselves for
2853  * foreign-table result relation(s).
2854  */
2855  mtstate = makeNode(ModifyTableState);
2856  mtstate->ps.plan = NULL;
2857  mtstate->ps.state = estate;
2858  mtstate->operation = CMD_INSERT;
2859  mtstate->resultRelInfo = estate->es_result_relations;
2860 
2861  if (resultRelInfo->ri_FdwRoutine != NULL &&
2862  resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
2863  resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
2864  resultRelInfo);
2865 
2866  /* Prepare to catch AFTER triggers. */
2868 
2869  /*
2870  * If there are any triggers with transition tables on the named relation,
2871  * we need to be prepared to capture transition tuples.
2872  *
2873  * Because partition tuple routing would like to know about whether
2874  * transition capture is active, we also set it in mtstate, which is
2875  * passed to ExecFindPartition() below.
2876  */
2877  cstate->transition_capture = mtstate->mt_transition_capture =
2879  RelationGetRelid(cstate->rel),
2880  CMD_INSERT);
2881 
2882  /*
2883  * If the named relation is a partitioned table, initialize state for
2884  * CopyFrom tuple routing.
2885  */
2886  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2887  proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel);
2888 
2889  if (cstate->whereClause)
2890  cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
2891  &mtstate->ps);
2892 
2893  /*
2894  * It's generally more efficient to prepare a bunch of tuples for
2895  * insertion, and insert them in one table_multi_insert() call, than call
2896  * table_tuple_insert() separately for every tuple. However, there are a
2897  * number of reasons why we might not be able to do this. These are
2898  * explained below.
2899  */
2900  if (resultRelInfo->ri_TrigDesc != NULL &&
2901  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2902  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2903  {
2904  /*
2905  * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
2906  * triggers on the table. Such triggers might query the table we're
2907  * inserting into and act differently if the tuples that have already
2908  * been processed and prepared for insertion are not there.
2909  */
2910  insertMethod = CIM_SINGLE;
2911  }
2912  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
2913  resultRelInfo->ri_TrigDesc->trig_insert_new_table)
2914  {
2915  /*
2916  * For partitioned tables we can't support multi-inserts when there
2917  * are any statement level insert triggers. It might be possible to
2918  * allow partitioned tables with such triggers in the future, but for
2919  * now, CopyMultiInsertInfoFlush expects that any before row insert
2920  * and statement level insert triggers are on the same relation.
2921  */
2922  insertMethod = CIM_SINGLE;
2923  }
2924  else if (resultRelInfo->ri_FdwRoutine != NULL ||
2925  cstate->volatile_defexprs)
2926  {
2927  /*
2928  * Can't support multi-inserts to foreign tables or if there are any
2929  * volatile default expressions in the table. Similarly to the
2930  * trigger case above, such expressions may query the table we're
2931  * inserting into.
2932  *
2933  * Note: It does not matter if any partitions have any volatile
2934  * default expressions as we use the defaults from the target of the
2935  * COPY command.
2936  */
2937  insertMethod = CIM_SINGLE;
2938  }
2939  else if (contain_volatile_functions(cstate->whereClause))
2940  {
2941  /*
2942  * Can't support multi-inserts if there are any volatile function
2943  * expressions in WHERE clause. Similarly to the trigger case above,
2944  * such expressions may query the table we're inserting into.
2945  */
2946  insertMethod = CIM_SINGLE;
2947  }
2948  else
2949  {
2950  /*
2951  * For partitioned tables, we may still be able to perform bulk
2952  * inserts. However, the possibility of this depends on which types
2953  * of triggers exist on the partition. We must disable bulk inserts
2954  * if the partition is a foreign table or it has any before row insert
2955  * or insert instead triggers (same as we checked above for the parent
2956  * table). Since the partition's resultRelInfos are initialized only
2957  * when we actually need to insert the first tuple into them, we must
2958  * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
2959  * flag that we must later determine if we can use bulk-inserts for
2960  * the partition being inserted into.
2961  */
2962  if (proute)
2963  insertMethod = CIM_MULTI_CONDITIONAL;
2964  else
2965  insertMethod = CIM_MULTI;
2966 
2967  CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
2968  estate, mycid, ti_options);
2969  }
2970 
2971  /*
2972  * If not using batch mode (which allocates slots as needed) set up a
2973  * tuple slot too. When inserting into a partitioned table, we also need
2974  * one, even if we might batch insert, to read the tuple in the root
2975  * partition's form.
2976  */
2977  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
2978  {
2979  singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
2980  &estate->es_tupleTable);
2981  bistate = GetBulkInsertState();
2982  }
2983 
2984  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2985  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
2986 
2987  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
2988  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
2989 
2990  /*
2991  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2992  * should do this for COPY, since it's not really an "INSERT" statement as
2993  * such. However, executing these triggers maintains consistency with the
2994  * EACH ROW triggers that we already fire on COPY.
2995  */
2996  ExecBSInsertTriggers(estate, resultRelInfo);
2997 
2998  econtext = GetPerTupleExprContext(estate);
2999 
3000  /* Set up callback to identify error line number */
3001  errcallback.callback = CopyFromErrorCallback;
3002  errcallback.arg = (void *) cstate;
3003  errcallback.previous = error_context_stack;
3004  error_context_stack = &errcallback;
3005 
3006  for (;;)
3007  {
3008  TupleTableSlot *myslot;
3009  bool skip_tuple;
3010 
3012 
3013  /*
3014  * Reset the per-tuple exprcontext. We do this after every tuple, to
3015  * clean-up after expression evaluations etc.
3016  */
3017  ResetPerTupleExprContext(estate);
3018 
3019  /* select slot to (initially) load row into */
3020  if (insertMethod == CIM_SINGLE || proute)
3021  {
3022  myslot = singleslot;
3023  Assert(myslot != NULL);
3024  }
3025  else
3026  {
3027  Assert(resultRelInfo == target_resultRelInfo);
3028  Assert(insertMethod == CIM_MULTI);
3029 
3030  myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
3031  resultRelInfo);
3032  }
3033 
3034  /*
3035  * Switch to per-tuple context before calling NextCopyFrom, which does
3036  * evaluate default expressions etc. and requires per-tuple context.
3037  */
3039 
3040  ExecClearTuple(myslot);
3041 
3042  /* Directly store the values/nulls array in the slot */
3043  if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
3044  break;
3045 
3046  ExecStoreVirtualTuple(myslot);
3047 
3048  /*
3049  * Constraints and where clause might reference the tableoid column,
3050  * so (re-)initialize tts_tableOid before evaluating them.
3051  */
3052  myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
3053 
3054  /* Triggers and stuff need to be invoked in query context. */
3055  MemoryContextSwitchTo(oldcontext);
3056 
3057  if (cstate->whereClause)
3058  {
3059  econtext->ecxt_scantuple = myslot;
3060  /* Skip items that don't match COPY's WHERE clause */
3061  if (!ExecQual(cstate->qualexpr, econtext))
3062  continue;
3063  }
3064 
3065  /* Determine the partition to insert the tuple into */
3066  if (proute)
3067  {
3068  TupleConversionMap *map;
3069 
3070  /*
3071  * Attempt to find a partition suitable for this tuple.
3072  * ExecFindPartition() will raise an error if none can be found or
3073  * if the found partition is not suitable for INSERTs.
3074  */
3075  resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
3076  proute, myslot, estate);
3077 
3078  if (prevResultRelInfo != resultRelInfo)
3079  {
3080  /* Determine which triggers exist on this partition */
3081  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3082  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
3083 
3084  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
3085  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
3086 
3087  /*
3088  * Disable multi-inserts when the partition has BEFORE/INSTEAD
3089  * OF triggers, or if the partition is a foreign partition.
3090  */
3091  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
3092  !has_before_insert_row_trig &&
3093  !has_instead_insert_row_trig &&
3094  resultRelInfo->ri_FdwRoutine == NULL;
3095 
3096  /* Set the multi-insert buffer to use for this partition. */
3097  if (leafpart_use_multi_insert)
3098  {
3099  if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
3100  CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
3101  resultRelInfo);
3102  }
3103  else if (insertMethod == CIM_MULTI_CONDITIONAL &&
3104  !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3105  {
3106  /*
3107  * Flush pending inserts if this partition can't use
3108  * batching, so rows are visible to triggers etc.
3109  */
3110  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3111  }
3112 
3113  if (bistate != NULL)
3114  ReleaseBulkInsertStatePin(bistate);
3115  prevResultRelInfo = resultRelInfo;
3116  }
3117 
3118  /*
3119  * For ExecInsertIndexTuples() to work on the partition's indexes
3120  */
3121  estate->es_result_relation_info = resultRelInfo;
3122 
3123  /*
3124  * If we're capturing transition tuples, we might need to convert
3125  * from the partition rowtype to root rowtype.
3126  */
3127  if (cstate->transition_capture != NULL)
3128  {
3129  if (has_before_insert_row_trig)
3130  {
3131  /*
3132  * If there are any BEFORE triggers on the partition,
3133  * we'll have to be ready to convert their result back to
3134  * tuplestore format.
3135  */
3137  cstate->transition_capture->tcs_map =
3138  resultRelInfo->ri_PartitionInfo->pi_PartitionToRootMap;
3139  }
3140  else
3141  {
3142  /*
3143  * Otherwise, just remember the original unconverted
3144  * tuple, to avoid a needless round trip conversion.
3145  */
3147  cstate->transition_capture->tcs_map = NULL;
3148  }
3149  }
3150 
3151  /*
3152  * We might need to convert from the root rowtype to the partition
3153  * rowtype.
3154  */
3155  map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
3156  if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
3157  {
3158  /* non batch insert */
3159  if (map != NULL)
3160  {
3161  TupleTableSlot *new_slot;
3162 
3163  new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
3164  myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
3165  }
3166  }
3167  else
3168  {
3169  /*
3170  * Prepare to queue up tuple for later batch insert into
3171  * current partition.
3172  */
3173  TupleTableSlot *batchslot;
3174 
3175  /* no other path available for partitioned table */
3176  Assert(insertMethod == CIM_MULTI_CONDITIONAL);
3177 
3178  batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
3179  resultRelInfo);
3180 
3181  if (map != NULL)
3182  myslot = execute_attr_map_slot(map->attrMap, myslot,
3183  batchslot);
3184  else
3185  {
3186  /*
3187  * This looks more expensive than it is (Believe me, I
3188  * optimized it away. Twice.). The input is in virtual
3189  * form, and we'll materialize the slot below - for most
3190  * slot types the copy performs the work materialization
3191  * would later require anyway.
3192  */
3193  ExecCopySlot(batchslot, myslot);
3194  myslot = batchslot;
3195  }
3196  }
3197 
3198  /* ensure that triggers etc see the right relation */
3199  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3200  }
3201 
3202  skip_tuple = false;
3203 
3204  /* BEFORE ROW INSERT Triggers */
3205  if (has_before_insert_row_trig)
3206  {
3207  if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
3208  skip_tuple = true; /* "do nothing" */
3209  }
3210 
3211  if (!skip_tuple)
3212  {
3213  /*
3214  * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
3215  * tuple. Otherwise, proceed with inserting the tuple into the
3216  * table or foreign table.
3217  */
3218  if (has_instead_insert_row_trig)
3219  {
3220  ExecIRInsertTriggers(estate, resultRelInfo, myslot);
3221  }
3222  else
3223  {
3224  /* Compute stored generated columns */
3225  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
3227  ExecComputeStoredGenerated(estate, myslot);
3228 
3229  /*
3230  * If the target is a plain table, check the constraints of
3231  * the tuple.
3232  */
3233  if (resultRelInfo->ri_FdwRoutine == NULL &&
3234  resultRelInfo->ri_RelationDesc->rd_att->constr)
3235  ExecConstraints(resultRelInfo, myslot, estate);
3236 
3237  /*
3238  * Also check the tuple against the partition constraint, if
3239  * there is one; except that if we got here via tuple-routing,
3240  * we don't need to if there's no BR trigger defined on the
3241  * partition.
3242  */
3243  if (resultRelInfo->ri_PartitionCheck &&
3244  (proute == NULL || has_before_insert_row_trig))
3245  ExecPartitionCheck(resultRelInfo, myslot, estate, true);
3246 
3247  /* Store the slot in the multi-insert buffer, when enabled. */
3248  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
3249  {
3250  /*
3251  * The slot previously might point into the per-tuple
3252  * context. For batching it needs to be longer lived.
3253  */
3254  ExecMaterializeSlot(myslot);
3255 
3256  /* Add this tuple to the tuple buffer */
3257  CopyMultiInsertInfoStore(&multiInsertInfo,
3258  resultRelInfo, myslot,
3259  cstate->line_buf.len,
3260  cstate->cur_lineno);
3261 
3262  /*
3263  * If enough inserts have queued up, then flush all
3264  * buffers out to their tables.
3265  */
3266  if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
3267  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
3268  }
3269  else
3270  {
3271  List *recheckIndexes = NIL;
3272 
3273  /* OK, store the tuple */
3274  if (resultRelInfo->ri_FdwRoutine != NULL)
3275  {
3276  myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
3277  resultRelInfo,
3278  myslot,
3279  NULL);
3280 
3281  if (myslot == NULL) /* "do nothing" */
3282  continue; /* next tuple please */
3283 
3284  /*
3285  * AFTER ROW Triggers might reference the tableoid
3286  * column, so (re-)initialize tts_tableOid before
3287  * evaluating them.
3288  */
3289  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
3290  }
3291  else
3292  {
3293  /* OK, store the tuple and create index entries for it */
3294  table_tuple_insert(resultRelInfo->ri_RelationDesc,
3295  myslot, mycid, ti_options, bistate);
3296 
3297  if (resultRelInfo->ri_NumIndices > 0)
3298  recheckIndexes = ExecInsertIndexTuples(myslot,
3299  estate,
3300  false,
3301  NULL,
3302  NIL);
3303  }
3304 
3305  /* AFTER ROW INSERT Triggers */
3306  ExecARInsertTriggers(estate, resultRelInfo, myslot,
3307  recheckIndexes, cstate->transition_capture);
3308 
3309  list_free(recheckIndexes);
3310  }
3311  }
3312 
3313  /*
3314  * We count only tuples not suppressed by a BEFORE INSERT trigger
3315  * or FDW; this is the same definition used by nodeModifyTable.c
3316  * for counting tuples inserted by an INSERT command.
3317  */
3318  processed++;
3319  }
3320  }
3321 
3322  /* Flush any remaining buffered tuples */
3323  if (insertMethod != CIM_SINGLE)
3324  {
3325  if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
3326  CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
3327  }
3328 
3329  /* Done, clean up */
3330  error_context_stack = errcallback.previous;
3331 
3332  if (bistate != NULL)
3333  FreeBulkInsertState(bistate);
3334 
3335  MemoryContextSwitchTo(oldcontext);
3336 
3337  /*
3338  * In the old protocol, tell pqcomm that we can process normal protocol
3339  * messages again.
3340  */
3341  if (cstate->copy_dest == COPY_OLD_FE)
3342  pq_endmsgread();
3343 
3344  /* Execute AFTER STATEMENT insertion triggers */
3345  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
3346 
3347  /* Handle queued AFTER triggers */
3348  AfterTriggerEndQuery(estate);
3349 
3350  ExecResetTupleTable(estate->es_tupleTable, false);
3351 
3352  /* Allow the FDW to shut down */
3353  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
3354  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
3355  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
3356  target_resultRelInfo);
3357 
3358  /* Tear down the multi-insert buffer data */
3359  if (insertMethod != CIM_SINGLE)
3360  CopyMultiInsertInfoCleanup(&multiInsertInfo);
3361 
3362  ExecCloseIndices(target_resultRelInfo);
3363 
3364  /* Close all the partitioned tables, leaf partitions, and their indices */
3365  if (proute)
3366  ExecCleanupTupleRouting(mtstate, proute);
3367 
3368  /* Close any trigger target relations */
3369  ExecCleanUpTriggerState(estate);
3370 
3371  FreeExecutorState(estate);
3372 
3373  return processed;
3374 }
3375 
3376 /*
3377  * Setup to read tuples from a file for COPY FROM.
3378  *
3379  * 'rel': Used as a template for the tuples
3380  * 'filename': Name of server-local file to read
3381  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
3382  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
3383  *
3384  * Returns a CopyState, to be passed to NextCopyFrom and related functions.
3385  */
3386 CopyState
3388  Relation rel,
3389  const char *filename,
3390  bool is_program,
3392  List *attnamelist,
3393  List *options)
3394 {
3395  CopyState cstate;
3396  bool pipe = (filename == NULL);
3397  TupleDesc tupDesc;
3398  AttrNumber num_phys_attrs,
3399  num_defaults;
3401  Oid *typioparams;
3402  int attnum;
3403  Oid in_func_oid;
3404  int *defmap;
3405  ExprState **defexprs;
3406  MemoryContext oldcontext;
3407  bool volatile_defexprs;
3408 
3409  cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
3410  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
3411 
3412  /* Initialize state variables */
3413  cstate->reached_eof = false;
3414  cstate->eol_type = EOL_UNKNOWN;
3415  cstate->cur_relname = RelationGetRelationName(cstate->rel);
3416  cstate->cur_lineno = 0;
3417  cstate->cur_attname = NULL;
3418  cstate->cur_attval = NULL;
3419 
3420  /* Set up variables to avoid per-attribute overhead. */
3421  initStringInfo(&cstate->attribute_buf);
3422  initStringInfo(&cstate->line_buf);
3423  cstate->line_buf_converted = false;
3424  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3425  cstate->raw_buf_index = cstate->raw_buf_len = 0;
3426 
3427  /* Assign range table, we'll need it in CopyFrom. */
3428  if (pstate)
3429  cstate->range_table = pstate->p_rtable;
3430 
3431  tupDesc = RelationGetDescr(cstate->rel);
3432  num_phys_attrs = tupDesc->natts;
3433  num_defaults = 0;
3434  volatile_defexprs = false;
3435 
3436  /*
3437  * Pick up the required catalog information for each attribute in the
3438  * relation, including the input function, the element type (to pass to
3439  * the input function), and info about defaults and constraints. (Which
3440  * input function we use depends on text/binary format choice.)
3441  */
3442  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3443  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3444  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3445  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3446 
3447  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3448  {
3449  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3450 
3451  /* We don't need info for dropped attributes */
3452  if (att->attisdropped)
3453  continue;
3454 
3455  /* Fetch the input function and typioparam info */
3456  if (cstate->binary)
3457  getTypeBinaryInputInfo(att->atttypid,
3458  &in_func_oid, &typioparams[attnum - 1]);
3459  else
3460  getTypeInputInfo(att->atttypid,
3461  &in_func_oid, &typioparams[attnum - 1]);
3462  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3463 
3464  /* Get default info if needed */
3465  if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
3466  {
3467  /* attribute is NOT to be copied from input */
3468  /* use default value if one exists */
3469  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3470  attnum);
3471 
3472  if (defexpr != NULL)
3473  {
3474  /* Run the expression through planner */
3475  defexpr = expression_planner(defexpr);
3476 
3477  /* Initialize executable expression in copycontext */
3478  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3479  defmap[num_defaults] = attnum - 1;
3480  num_defaults++;
3481 
3482  /*
3483  * If a default expression looks at the table being loaded,
3484  * then it could give the wrong answer when using
3485  * multi-insert. Since database access can be dynamic this is
3486  * hard to test for exactly, so we use the much wider test of
3487  * whether the default expression is volatile. We allow for
3488  * the special case of when the default expression is the
3489  * nextval() of a sequence which in this specific case is
3490  * known to be safe for use with the multi-insert
3491  * optimization. Hence we use this special case function
3492  * checker rather than the standard check for
3493  * contain_volatile_functions().
3494  */
3495  if (!volatile_defexprs)
3496  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3497  }
3498  }
3499  }
3500 
3501  /* We keep those variables in cstate. */
3502  cstate->in_functions = in_functions;
3503  cstate->typioparams = typioparams;
3504  cstate->defmap = defmap;
3505  cstate->defexprs = defexprs;
3507  cstate->num_defaults = num_defaults;
3508  cstate->is_program = is_program;
3509 
3510  if (data_source_cb)
3511  {
3512  cstate->copy_dest = COPY_CALLBACK;
3513  cstate->data_source_cb = data_source_cb;
3514  }
3515  else if (pipe)
3516  {
3517  Assert(!is_program); /* the grammar does not allow this */
3519  ReceiveCopyBegin(cstate);
3520  else
3521  cstate->copy_file = stdin;
3522  }
3523  else
3524  {
3525  cstate->filename = pstrdup(filename);
3526 
3527  if (cstate->is_program)
3528  {
3529  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3530  if (cstate->copy_file == NULL)
3531  ereport(ERROR,
3533  errmsg("could not execute command \"%s\": %m",
3534  cstate->filename)));
3535  }
3536  else
3537  {
3538  struct stat st;
3539 
3540  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3541  if (cstate->copy_file == NULL)
3542  {
3543  /* copy errno because ereport subfunctions might change it */
3544  int save_errno = errno;
3545 
3546  ereport(ERROR,
3548  errmsg("could not open file \"%s\" for reading: %m",
3549  cstate->filename),
3550  (save_errno == ENOENT || save_errno == EACCES) ?
3551  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3552  "You may want a client-side facility such as psql's \\copy.") : 0));
3553  }
3554 
3555  if (fstat(fileno(cstate->copy_file), &st))
3556  ereport(ERROR,
3558  errmsg("could not stat file \"%s\": %m",
3559  cstate->filename)));
3560 
3561  if (S_ISDIR(st.st_mode))
3562  ereport(ERROR,
3563  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3564  errmsg("\"%s\" is a directory", cstate->filename)));
3565  }
3566  }
3567 
3568  if (cstate->binary)
3569  {
3570  /* Read and verify binary header */
3571  char readSig[11];
3572  int32 tmp;
3573 
3574  /* Signature */
3575  if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3576  memcmp(readSig, BinarySignature, 11) != 0)
3577  ereport(ERROR,
3578  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3579  errmsg("COPY file signature not recognized")));
3580  /* Flags field */
3581  if (!CopyGetInt32(cstate, &tmp))
3582  ereport(ERROR,
3583  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3584  errmsg("invalid COPY file header (missing flags)")));
3585  if ((tmp & (1 << 16)) != 0)
3586  ereport(ERROR,
3587  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3588  errmsg("invalid COPY file header (WITH OIDS)")));
3589  tmp &= ~(1 << 16);
3590  if ((tmp >> 16) != 0)
3591  ereport(ERROR,
3592  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3593  errmsg("unrecognized critical flags in COPY file header")));
3594  /* Header extension length */
3595  if (!CopyGetInt32(cstate, &tmp) ||
3596  tmp < 0)
3597  ereport(ERROR,
3598  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3599  errmsg("invalid COPY file header (missing length)")));
3600  /* Skip extension header, if present */
3601  while (tmp-- > 0)
3602  {
3603  if (CopyGetData(cstate, readSig, 1, 1) != 1)
3604  ereport(ERROR,
3605  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3606  errmsg("invalid COPY file header (wrong length)")));
3607  }
3608  }
3609 
3610  /* create workspace for CopyReadAttributes results */
3611  if (!cstate->binary)
3612  {
3613  AttrNumber attr_count = list_length(cstate->attnumlist);
3614 
3615  cstate->max_fields = attr_count;
3616  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
3617  }
3618 
3619  MemoryContextSwitchTo(oldcontext);
3620 
3621  return cstate;
3622 }
3623 
3624 /*
3625  * Read raw fields in the next line for COPY FROM in text or csv mode.
3626  * Return false if no more lines.
3627  *
3628  * An internal temporary buffer is returned via 'fields'. It is valid until
3629  * the next call of the function. Since the function returns all raw fields
3630  * in the input file, 'nfields' could be different from the number of columns
3631  * in the relation.
3632  *
3633  * NOTE: force_not_null option are not applied to the returned fields.
3634  */
3635 bool
3636 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3637 {
3638  int fldct;
3639  bool done;
3640 
3641  /* only available for text or csv input */
3642  Assert(!cstate->binary);
3643 
3644  /* on input just throw the header line away */
3645  if (cstate->cur_lineno == 0 && cstate->header_line)
3646  {
3647  cstate->cur_lineno++;
3648  if (CopyReadLine(cstate))
3649  return false; /* done */
3650  }
3651 
3652  cstate->cur_lineno++;
3653 
3654  /* Actually read the line into memory here */
3655  done = CopyReadLine(cstate);
3656 
3657  /*
3658  * EOF at start of line means we're done. If we see EOF after some
3659  * characters, we act as though it was newline followed by EOF, ie,
3660  * process the line and then exit loop on next iteration.
3661  */
3662  if (done && cstate->line_buf.len == 0)
3663  return false;
3664 
3665  /* Parse the line into de-escaped field values */
3666  if (cstate->csv_mode)
3667  fldct = CopyReadAttributesCSV(cstate);
3668  else
3669  fldct = CopyReadAttributesText(cstate);
3670 
3671  *fields = cstate->raw_fields;
3672  *nfields = fldct;
3673  return true;
3674 }
3675 
3676 /*
3677  * Read next tuple from file for COPY FROM. Return false if no more tuples.
3678  *
3679  * 'econtext' is used to evaluate default expression for each columns not
3680  * read from the file. It can be NULL when no default values are used, i.e.
3681  * when all columns are read from the file.
3682  *
3683  * 'values' and 'nulls' arrays must be the same length as columns of the
3684  * relation passed to BeginCopyFrom. This function fills the arrays.
3685  * Oid of the tuple is returned with 'tupleOid' separately.
3686  */
3687 bool
3689  Datum *values, bool *nulls)
3690 {
3691  TupleDesc tupDesc;
3692  AttrNumber num_phys_attrs,
3693  attr_count,
3694  num_defaults = cstate->num_defaults;
3695  FmgrInfo *in_functions = cstate->in_functions;
3696  Oid *typioparams = cstate->typioparams;
3697  int i;
3698  int *defmap = cstate->defmap;
3699  ExprState **defexprs = cstate->defexprs;
3700 
3701  tupDesc = RelationGetDescr(cstate->rel);
3702  num_phys_attrs = tupDesc->natts;
3703  attr_count = list_length(cstate->attnumlist);
3704 
3705  /* Initialize all values for row to NULL */
3706  MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3707  MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3708 
3709  if (!cstate->binary)
3710  {
3711  char **field_strings;
3712  ListCell *cur;
3713  int fldct;
3714  int fieldno;
3715  char *string;
3716 
3717  /* read raw fields in the next line */
3718  if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3719  return false;
3720 
3721  /* check for overflowing fields */
3722  if (attr_count > 0 && fldct > attr_count)
3723  ereport(ERROR,
3724  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3725  errmsg("extra data after last expected column")));
3726 
3727  fieldno = 0;
3728 
3729  /* Loop to read the user attributes on the line. */
3730  foreach(cur, cstate->attnumlist)
3731  {
3732  int attnum = lfirst_int(cur);
3733  int m = attnum - 1;
3734  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3735 
3736  if (fieldno >= fldct)
3737  ereport(ERROR,
3738  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3739  errmsg("missing data for column \"%s\"",
3740  NameStr(att->attname))));
3741  string = field_strings[fieldno++];
3742 
3743  if (cstate->convert_select_flags &&
3744  !cstate->convert_select_flags[m])
3745  {
3746  /* ignore input field, leaving column as NULL */
3747  continue;
3748  }
3749 
3750  if (cstate->csv_mode)
3751  {
3752  if (string == NULL &&
3753  cstate->force_notnull_flags[m])
3754  {
3755  /*
3756  * FORCE_NOT_NULL option is set and column is NULL -
3757  * convert it to the NULL string.
3758  */
3759  string = cstate->null_print;
3760  }
3761  else if (string != NULL && cstate->force_null_flags[m]
3762  && strcmp(string, cstate->null_print) == 0)
3763  {
3764  /*
3765  * FORCE_NULL option is set and column matches the NULL
3766  * string. It must have been quoted, or otherwise the
3767  * string would already have been set to NULL. Convert it
3768  * to NULL as specified.
3769  */
3770  string = NULL;
3771  }
3772  }
3773 
3774  cstate->cur_attname = NameStr(att->attname);
3775  cstate->cur_attval = string;
3776  values[m] = InputFunctionCall(&in_functions[m],
3777  string,
3778  typioparams[m],
3779  att->atttypmod);
3780  if (string != NULL)
3781  nulls[m] = false;
3782  cstate->cur_attname = NULL;
3783  cstate->cur_attval = NULL;
3784  }
3785 
3786  Assert(fieldno == attr_count);
3787  }
3788  else
3789  {
3790  /* binary */
3791  int16 fld_count;
3792  ListCell *cur;
3793 
3794  cstate->cur_lineno++;
3795 
3796  if (!CopyGetInt16(cstate, &fld_count))
3797  {
3798  /* EOF detected (end of file, or protocol-level EOF) */
3799  return false;
3800  }
3801 
3802  if (fld_count == -1)
3803  {
3804  /*
3805  * Received EOF marker. In a V3-protocol copy, wait for the
3806  * protocol-level EOF, and complain if it doesn't come
3807  * immediately. This ensures that we correctly handle CopyFail,
3808  * if client chooses to send that now.
3809  *
3810  * Note that we MUST NOT try to read more data in an old-protocol
3811  * copy, since there is no protocol-level EOF marker then. We
3812  * could go either way for copy from file, but choose to throw
3813  * error if there's data after the EOF marker, for consistency
3814  * with the new-protocol case.
3815  */
3816  char dummy;
3817 
3818  if (cstate->copy_dest != COPY_OLD_FE &&
3819  CopyGetData(cstate, &dummy, 1, 1) > 0)
3820  ereport(ERROR,
3821  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3822  errmsg("received copy data after EOF marker")));
3823  return false;
3824  }
3825 
3826  if (fld_count != attr_count)
3827  ereport(ERROR,
3828  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3829  errmsg("row field count is %d, expected %d",
3830  (int) fld_count, attr_count)));
3831 
3832  i = 0;
3833  foreach(cur, cstate->attnumlist)
3834  {
3835  int attnum = lfirst_int(cur);
3836  int m = attnum - 1;
3837  Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3838 
3839  cstate->cur_attname = NameStr(att->attname);
3840  i++;
3841  values[m] = CopyReadBinaryAttribute(cstate,
3842  i,
3843  &in_functions[m],
3844  typioparams[m],
3845  att->atttypmod,
3846  &nulls[m]);
3847  cstate->cur_attname = NULL;
3848  }
3849  }
3850 
3851  /*
3852  * Now compute and insert any defaults available for the columns not
3853  * provided by the input data. Anything not processed here or above will
3854  * remain NULL.
3855  */
3856  for (i = 0; i < num_defaults; i++)
3857  {
3858  /*
3859  * The caller must supply econtext and have switched into the
3860  * per-tuple memory context in it.
3861  */
3862  Assert(econtext != NULL);
3864 
3865  values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3866  &nulls[defmap[i]]);
3867  }
3868 
3869  return true;
3870 }
3871 
3872 /*
3873  * Clean up storage and release resources for COPY FROM.
3874  */
3875 void
3877 {
3878  /* No COPY FROM related resources except memory. */
3879 
3880  EndCopy(cstate);
3881 }
3882 
3883 /*
3884  * Read the next input line and stash it in line_buf, with conversion to
3885  * server encoding.
3886  *
3887  * Result is true if read was terminated by EOF, false if terminated
3888  * by newline. The terminating newline or EOF marker is not included
3889  * in the final value of line_buf.
3890  */
3891 static bool
3893 {
3894  bool result;
3895 
3896  resetStringInfo(&cstate->line_buf);
3897  cstate->line_buf_valid = true;
3898 
3899  /* Mark that encoding conversion hasn't occurred yet */
3900  cstate->line_buf_converted = false;
3901 
3902  /* Parse data and transfer into line_buf */
3903  result = CopyReadLineText(cstate);
3904 
3905  if (result)
3906  {
3907  /*
3908  * Reached EOF. In protocol version 3, we should ignore anything
3909  * after \. up to the protocol end of copy data. (XXX maybe better
3910  * not to treat \. as special?)
3911  */
3912  if (cstate->copy_dest == COPY_NEW_FE)
3913  {
3914  do
3915  {
3916  cstate->raw_buf_index = cstate->raw_buf_len;
3917  } while (CopyLoadRawBuf(cstate));
3918  }
3919  }
3920  else
3921  {
3922  /*
3923  * If we didn't hit EOF, then we must have transferred the EOL marker
3924  * to line_buf along with the data. Get rid of it.
3925  */
3926  switch (cstate->eol_type)
3927  {
3928  case EOL_NL:
3929  Assert(cstate->line_buf.len >= 1);
3930  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3931  cstate->line_buf.len--;
3932  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3933  break;
3934  case EOL_CR:
3935  Assert(cstate->line_buf.len >= 1);
3936  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3937  cstate->line_buf.len--;
3938  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3939  break;
3940  case EOL_CRNL:
3941  Assert(cstate->line_buf.len >= 2);
3942  Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3943  Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3944  cstate->line_buf.len -= 2;
3945  cstate->line_buf.data[cstate->line_buf.len] = '\0';
3946  break;
3947  case EOL_UNKNOWN:
3948  /* shouldn't get here */
3949  Assert(false);
3950  break;
3951  }
3952  }
3953 
3954  /* Done reading the line. Convert it to server encoding. */
3955  if (cstate->need_transcoding)
3956  {
3957  char *cvt;
3958 
3959  cvt = pg_any_to_server(cstate->line_buf.data,
3960  cstate->line_buf.len,
3961  cstate->file_encoding);
3962  if (cvt != cstate->line_buf.data)
3963  {
3964  /* transfer converted data back to line_buf */
3965  resetStringInfo(&cstate->line_buf);
3966  appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3967  pfree(cvt);
3968  }
3969  }
3970 
3971  /* Now it's safe to use the buffer in error messages */
3972  cstate->line_buf_converted = true;
3973 
3974  return result;
3975 }
3976 
3977 /*
3978  * CopyReadLineText - inner loop of CopyReadLine for text mode
3979  */
3980 static bool
3982 {
3983  char *copy_raw_buf;
3984  int raw_buf_ptr;
3985  int copy_buf_len;
3986  bool need_data = false;
3987  bool hit_eof = false;
3988  bool result = false;
3989  char mblen_str[2];
3990 
3991  /* CSV variables */
3992  bool first_char_in_line = true;
3993  bool in_quote = false,
3994  last_was_esc = false;
3995  char quotec = '\0';
3996  char escapec = '\0';
3997 
3998  if (cstate->csv_mode)
3999  {
4000  quotec = cstate->quote[0];
4001  escapec = cstate->escape[0];
4002  /* ignore special escape processing if it's the same as quotec */
4003  if (quotec == escapec)
4004  escapec = '\0';
4005  }
4006 
4007  mblen_str[1] = '\0';
4008 
4009  /*
4010  * The objective of this loop is to transfer the entire next input line
4011  * into line_buf. Hence, we only care for detecting newlines (\r and/or
4012  * \n) and the end-of-copy marker (\.).
4013  *
4014  * In CSV mode, \r and \n inside a quoted field are just part of the data
4015  * value and are put in line_buf. We keep just enough state to know if we
4016  * are currently in a quoted field or not.
4017  *
4018  * These four characters, and the CSV escape and quote characters, are
4019  * assumed the same in frontend and backend encodings.
4020  *
4021  * For speed, we try to move data from raw_buf to line_buf in chunks
4022  * rather than one character at a time. raw_buf_ptr points to the next
4023  * character to examine; any characters from raw_buf_index to raw_buf_ptr
4024  * have been determined to be part of the line, but not yet transferred to
4025  * line_buf.
4026  *
4027  * For a little extra speed within the loop, we copy raw_buf and
4028  * raw_buf_len into local variables.
4029  */
4030  copy_raw_buf = cstate->raw_buf;
4031  raw_buf_ptr = cstate->raw_buf_index;
4032  copy_buf_len = cstate->raw_buf_len;
4033 
4034  for (;;)
4035  {
4036  int prev_raw_ptr;
4037  char c;
4038 
4039  /*
4040  * Load more data if needed. Ideally we would just force four bytes
4041  * of read-ahead and avoid the many calls to
4042  * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
4043  * does not allow us to read too far ahead or we might read into the
4044  * next data, so we read-ahead only as far we know we can. One
4045  * optimization would be to read-ahead four byte here if
4046  * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
4047  * considering the size of the buffer.
4048  */
4049  if (raw_buf_ptr >= copy_buf_len || need_data)
4050  {
4052 
4053  /*
4054  * Try to read some more data. This will certainly reset
4055  * raw_buf_index to zero, and raw_buf_ptr must go with it.
4056  */
4057  if (!CopyLoadRawBuf(cstate))
4058  hit_eof = true;
4059  raw_buf_ptr = 0;
4060  copy_buf_len = cstate->raw_buf_len;
4061 
4062  /*
4063  * If we are completely out of data, break out of the loop,
4064  * reporting EOF.
4065  */
4066  if (copy_buf_len <= 0)
4067  {
4068  result = true;
4069  break;
4070  }
4071  need_data = false;
4072  }
4073 
4074  /* OK to fetch a character */
4075  prev_raw_ptr = raw_buf_ptr;
4076  c = copy_raw_buf[raw_buf_ptr++];
4077 
4078  if (cstate->csv_mode)
4079  {
4080  /*
4081  * If character is '\\' or '\r', we may need to look ahead below.
4082  * Force fetch of the next character if we don't already have it.
4083  * We need to do this before changing CSV state, in case one of
4084  * these characters is also the quote or escape character.
4085  *
4086  * Note: old-protocol does not like forced prefetch, but it's OK
4087  * here since we cannot validly be at EOF.
4088  */
4089  if (c == '\\' || c == '\r')
4090  {
4092  }
4093 
4094  /*
4095  * Dealing with quotes and escapes here is mildly tricky. If the
4096  * quote char is also the escape char, there's no problem - we
4097  * just use the char as a toggle. If they are different, we need
4098  * to ensure that we only take account of an escape inside a
4099  * quoted field and immediately preceding a quote char, and not
4100  * the second in an escape-escape sequence.
4101  */
4102  if (in_quote && c == escapec)
4103  last_was_esc = !last_was_esc;
4104  if (c == quotec && !last_was_esc)
4105  in_quote = !in_quote;
4106  if (c != escapec)
4107  last_was_esc = false;
4108 
4109  /*
4110  * Updating the line count for embedded CR and/or LF chars is
4111  * necessarily a little fragile - this test is probably about the
4112  * best we can do. (XXX it's arguable whether we should do this
4113  * at all --- is cur_lineno a physical or logical count?)
4114  */
4115  if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
4116  cstate->cur_lineno++;
4117  }
4118 
4119  /* Process \r */
4120  if (c == '\r' && (!cstate->csv_mode || !in_quote))
4121  {
4122  /* Check for \r\n on first line, _and_ handle \r\n. */
4123  if (cstate->eol_type == EOL_UNKNOWN ||
4124  cstate->eol_type == EOL_CRNL)
4125  {
4126  /*
4127  * If need more data, go back to loop top to load it.
4128  *
4129  * Note that if we are at EOF, c will wind up as '\0' because
4130  * of the guaranteed pad of raw_buf.
4131  */
4133 
4134  /* get next char */
4135  c = copy_raw_buf[raw_buf_ptr];
4136 
4137  if (c == '\n')
4138  {
4139  raw_buf_ptr++; /* eat newline */
4140  cstate->eol_type = EOL_CRNL; /* in case not set yet */
4141  }
4142  else
4143  {
4144  /* found \r, but no \n */
4145  if (cstate->eol_type == EOL_CRNL)
4146  ereport(ERROR,
4147  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4148  !cstate->csv_mode ?
4149  errmsg("literal carriage return found in data") :
4150  errmsg("unquoted carriage return found in data"),
4151  !cstate->csv_mode ?
4152  errhint("Use \"\\r\" to represent carriage return.") :
4153  errhint("Use quoted CSV field to represent carriage return.")));
4154 
4155  /*
4156  * if we got here, it is the first line and we didn't find
4157  * \n, so don't consume the peeked character
4158  */
4159  cstate->eol_type = EOL_CR;
4160  }
4161  }
4162  else if (cstate->eol_type == EOL_NL)
4163  ereport(ERROR,
4164  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4165  !cstate->csv_mode ?
4166  errmsg("literal carriage return found in data") :
4167  errmsg("unquoted carriage return found in data"),
4168  !cstate->csv_mode ?
4169  errhint("Use \"\\r\" to represent carriage return.") :
4170  errhint("Use quoted CSV field to represent carriage return.")));
4171  /* If reach here, we have found the line terminator */
4172  break;
4173  }
4174 
4175  /* Process \n */
4176  if (c == '\n' && (!cstate->csv_mode || !in_quote))
4177  {
4178  if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
4179  ereport(ERROR,
4180  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4181  !cstate->csv_mode ?
4182  errmsg("literal newline found in data") :
4183  errmsg("unquoted newline found in data"),
4184  !cstate->csv_mode ?
4185  errhint("Use \"\\n\" to represent newline.") :
4186  errhint("Use quoted CSV field to represent newline.")));
4187  cstate->eol_type = EOL_NL; /* in case not set yet */
4188  /* If reach here, we have found the line terminator */
4189  break;
4190  }
4191 
4192  /*
4193  * In CSV mode, we only recognize \. alone on a line. This is because
4194  * \. is a valid CSV data value.
4195  */
4196  if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
4197  {
4198  char c2;
4199 
4202 
4203  /* -----
4204  * get next character
4205  * Note: we do not change c so if it isn't \., we can fall
4206  * through and continue processing for file encoding.
4207  * -----
4208  */
4209  c2 = copy_raw_buf[raw_buf_ptr];
4210 
4211  if (c2 == '.')
4212  {
4213  raw_buf_ptr++; /* consume the '.' */
4214 
4215  /*
4216  * Note: if we loop back for more data here, it does not
4217  * matter that the CSV state change checks are re-executed; we
4218  * will come back here with no important state changed.
4219  */
4220  if (cstate->eol_type == EOL_CRNL)
4221  {
4222  /* Get the next character */
4224  /* if hit_eof, c2 will become '\0' */
4225  c2 = copy_raw_buf[raw_buf_ptr++];
4226 
4227  if (c2 == '\n')
4228  {
4229  if (!cstate->csv_mode)
4230  ereport(ERROR,
4231  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4232  errmsg("end-of-copy marker does not match previous newline style")));
4233  else
4235  }
4236  else if (c2 != '\r')
4237  {
4238  if (!cstate->csv_mode)
4239  ereport(ERROR,
4240  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4241  errmsg("end-of-copy marker corrupt")));
4242  else
4244  }
4245  }
4246 
4247  /* Get the next character */
4249  /* if hit_eof, c2 will become '\0' */
4250  c2 = copy_raw_buf[raw_buf_ptr++];
4251 
4252  if (c2 != '\r' && c2 != '\n')
4253  {
4254  if (!cstate->csv_mode)
4255  ereport(ERROR,
4256  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4257  errmsg("end-of-copy marker corrupt")));
4258  else
4260  }
4261 
4262  if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
4263  (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
4264  (cstate->eol_type == EOL_CR && c2 != '\r'))
4265  {
4266  ereport(ERROR,
4267  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4268  errmsg("end-of-copy marker does not match previous newline style")));
4269  }
4270 
4271  /*
4272  * Transfer only the data before the \. into line_buf, then
4273  * discard the data and the \. sequence.
4274  */
4275  if (prev_raw_ptr > cstate->raw_buf_index)
4277  cstate->raw_buf + cstate->raw_buf_index,
4278  prev_raw_ptr - cstate->raw_buf_index);
4279  cstate->raw_buf_index = raw_buf_ptr;
4280  result = true; /* report EOF */
4281  break;
4282  }
4283  else if (!cstate->csv_mode)
4284 
4285  /*
4286  * If we are here, it means we found a backslash followed by
4287  * something other than a period. In non-CSV mode, anything
4288  * after a backslash is special, so we skip over that second
4289  * character too. If we didn't do that \\. would be
4290  * considered an eof-of copy, while in non-CSV mode it is a
4291  * literal backslash followed by a period. In CSV mode,
4292  * backslashes are not special, so we want to process the
4293  * character after the backslash just like a normal character,
4294  * so we don't increment in those cases.
4295  */
4296  raw_buf_ptr++;
4297  }
4298 
4299  /*
4300  * This label is for CSV cases where \. appears at the start of a
4301  * line, but there is more text after it, meaning it was a data value.
4302  * We are more strict for \. in CSV mode because \. could be a data
4303  * value, while in non-CSV mode, \. cannot be a data value.
4304  */
4305 not_end_of_copy:
4306 
4307  /*
4308  * Process all bytes of a multi-byte character as a group.
4309  *
4310  * We only support multi-byte sequences where the first byte has the
4311  * high-bit set, so as an optimization we can avoid this block
4312  * entirely if it is not set.
4313  */
4314  if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
4315  {
4316  int mblen;
4317 
4318  /*
4319  * It is enough to look at the first byte in all our encodings, to
4320  * get the length. (GB18030 is a bit special, but still works for
4321  * our purposes; see comment in pg_gb18030_mblen())
4322  */
4323  mblen_str[0] = c;
4324  mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
4325 
4327  IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
4328  raw_buf_ptr += mblen - 1;
4329  }
4330  first_char_in_line = false;
4331  } /* end of outer loop */
4332 
4333  /*
4334  * Transfer any still-uncopied data to line_buf.
4335  */
4337 
4338  return result;
4339 }
4340 
4341 /*
4342  * Return decimal value for a hexadecimal digit
4343  */
4344 static int
4346 {
4347  if (isdigit((unsigned char) hex))
4348  return hex - '0';
4349  else
4350  return tolower((unsigned char) hex) - 'a' + 10;
4351 }
4352 
4353 /*
4354  * Parse the current line into separate attributes (fields),
4355  * performing de-escaping as needed.
4356  *
4357  * The input is in line_buf. We use attribute_buf to hold the result
4358  * strings. cstate->raw_fields[k] is set to point to the k'th attribute
4359  * string, or NULL when the input matches the null marker string.
4360  * This array is expanded as necessary.
4361  *
4362  * (Note that the caller cannot check for nulls since the returned
4363  * string would be the post-de-escaping equivalent, which may look
4364  * the same as some valid data string.)
4365  *
4366  * delim is the column delimiter string (must be just one byte for now).
4367  * null_print is the null marker string. Note that this is compared to
4368  * the pre-de-escaped input string.
4369  *
4370  * The return value is the number of fields actually read.
4371  */
4372 static int
4374 {
4375  char delimc = cstate->delim[0];
4376  int fieldno;
4377  char *output_ptr;
4378  char *cur_ptr;
4379  char *line_end_ptr;
4380 
4381  /*
4382  * We need a special case for zero-column tables: check that the input
4383  * line is empty, and return.
4384  */
4385  if (cstate->max_fields <= 0)
4386  {
4387  if (cstate->line_buf.len != 0)
4388  ereport(ERROR,
4389  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4390  errmsg("extra data after last expected column")));
4391  return 0;
4392  }
4393 
4394  resetStringInfo(&cstate->attribute_buf);
4395 
4396  /*
4397  * The de-escaped attributes will certainly not be longer than the input
4398  * data line, so we can just force attribute_buf to be large enough and
4399  * then transfer data without any checks for enough space. We need to do
4400  * it this way because enlarging attribute_buf mid-stream would invalidate
4401  * pointers already stored into cstate->raw_fields[].
4402  */
4403  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4404  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4405  output_ptr = cstate->attribute_buf.data;
4406 
4407  /* set pointer variables for loop */
4408  cur_ptr = cstate->line_buf.data;
4409  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4410 
4411  /* Outer loop iterates over fields */
4412  fieldno = 0;
4413  for (;;)
4414  {
4415  bool found_delim = false;
4416  char *start_ptr;
4417  char *end_ptr;
4418  int input_len;
4419  bool saw_non_ascii = false;
4420 
4421  /* Make sure there is enough space for the next value */
4422  if (fieldno >= cstate->max_fields)
4423  {
4424  cstate->max_fields *= 2;
4425  cstate->raw_fields =
4426  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4427  }
4428 
4429  /* Remember start of field on both input and output sides */
4430  start_ptr = cur_ptr;
4431  cstate->raw_fields[fieldno] = output_ptr;
4432 
4433  /*
4434  * Scan data for field.
4435  *
4436  * Note that in this loop, we are scanning to locate the end of field
4437  * and also speculatively performing de-escaping. Once we find the
4438  * end-of-field, we can match the raw field contents against the null
4439  * marker string. Only after that comparison fails do we know that
4440  * de-escaping is actually the right thing to do; therefore we *must
4441  * not* throw any syntax errors before we've done the null-marker
4442  * check.
4443  */
4444  for (;;)
4445  {
4446  char c;
4447 
4448  end_ptr = cur_ptr;
4449  if (cur_ptr >= line_end_ptr)
4450  break;
4451  c = *cur_ptr++;
4452  if (c == delimc)
4453  {
4454  found_delim = true;
4455  break;
4456  }
4457  if (c == '\\')
4458  {
4459  if (cur_ptr >= line_end_ptr)
4460  break;
4461  c = *cur_ptr++;
4462  switch (c)
4463  {
4464  case '0':
4465  case '1':
4466  case '2':
4467  case '3':
4468  case '4':
4469  case '5':
4470  case '6':
4471  case '7':
4472  {
4473  /* handle \013 */
4474  int val;
4475 
4476  val = OCTVALUE(c);
4477  if (cur_ptr < line_end_ptr)
4478  {
4479  c = *cur_ptr;
4480  if (ISOCTAL(c))
4481  {
4482  cur_ptr++;
4483  val = (val << 3) + OCTVALUE(c);
4484  if (cur_ptr < line_end_ptr)
4485  {
4486  c = *cur_ptr;
4487  if (ISOCTAL(c))
4488  {
4489  cur_ptr++;
4490  val = (val << 3) + OCTVALUE(c);
4491  }
4492  }
4493  }
4494  }
4495  c = val & 0377;
4496  if (c == '\0' || IS_HIGHBIT_SET(c))
4497  saw_non_ascii = true;
4498  }
4499  break;
4500  case 'x':
4501  /* Handle \x3F */
4502  if (cur_ptr < line_end_ptr)
4503  {
4504  char hexchar = *cur_ptr;
4505 
4506  if (isxdigit((unsigned char) hexchar))
4507  {
4508  int val = GetDecimalFromHex(hexchar);
4509 
4510  cur_ptr++;
4511  if (cur_ptr < line_end_ptr)
4512  {
4513  hexchar = *cur_ptr;
4514  if (isxdigit((unsigned char) hexchar))
4515  {
4516  cur_ptr++;
4517  val = (val << 4) + GetDecimalFromHex(hexchar);
4518  }
4519  }
4520  c = val & 0xff;
4521  if (c == '\0' || IS_HIGHBIT_SET(c))
4522  saw_non_ascii = true;
4523  }
4524  }
4525  break;
4526  case 'b':
4527  c = '\b';
4528  break;
4529  case 'f':
4530  c = '\f';
4531  break;
4532  case 'n':
4533  c = '\n';
4534  break;
4535  case 'r':
4536  c = '\r';
4537  break;
4538  case 't':
4539  c = '\t';
4540  break;
4541  case 'v':
4542  c = '\v';
4543  break;
4544 
4545  /*
4546  * in all other cases, take the char after '\'
4547  * literally
4548  */
4549  }
4550  }
4551 
4552  /* Add c to output string */
4553  *output_ptr++ = c;
4554  }
4555 
4556  /* Check whether raw input matched null marker */
4557  input_len = end_ptr - start_ptr;
4558  if (input_len == cstate->null_print_len &&
4559  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4560  cstate->raw_fields[fieldno] = NULL;
4561  else
4562  {
4563  /*
4564  * At this point we know the field is supposed to contain data.
4565  *
4566  * If we de-escaped any non-7-bit-ASCII chars, make sure the
4567  * resulting string is valid data for the db encoding.
4568  */
4569  if (saw_non_ascii)
4570  {
4571  char *fld = cstate->raw_fields[fieldno];
4572 
4573  pg_verifymbstr(fld, output_ptr - fld, false);
4574  }
4575  }
4576 
4577  /* Terminate attribute value in output area */
4578  *output_ptr++ = '\0';
4579 
4580  fieldno++;
4581  /* Done if we hit EOL instead of a delim */
4582  if (!found_delim)
4583  break;
4584  }
4585 
4586  /* Clean up state of attribute_buf */
4587  output_ptr--;
4588  Assert(*output_ptr == '\0');
4589  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4590 
4591  return fieldno;
4592 }
4593 
4594 /*
4595  * Parse the current line into separate attributes (fields),
4596  * performing de-escaping as needed. This has exactly the same API as
4597  * CopyReadAttributesText, except we parse the fields according to
4598  * "standard" (i.e. common) CSV usage.
4599  */
4600 static int
4602 {
4603  char delimc = cstate->delim[0];
4604  char quotec = cstate->quote[0];
4605  char escapec = cstate->escape[0];
4606  int fieldno;
4607  char *output_ptr;
4608  char *cur_ptr;
4609  char *line_end_ptr;
4610 
4611  /*
4612  * We need a special case for zero-column tables: check that the input
4613  * line is empty, and return.
4614  */
4615  if (cstate->max_fields <= 0)
4616  {
4617  if (cstate->line_buf.len != 0)
4618  ereport(ERROR,
4619  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4620  errmsg("extra data after last expected column")));
4621  return 0;
4622  }
4623 
4624  resetStringInfo(&cstate->attribute_buf);
4625 
4626  /*
4627  * The de-escaped attributes will certainly not be longer than the input
4628  * data line, so we can just force attribute_buf to be large enough and
4629  * then transfer data without any checks for enough space. We need to do
4630  * it this way because enlarging attribute_buf mid-stream would invalidate
4631  * pointers already stored into cstate->raw_fields[].
4632  */
4633  if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4634  enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4635  output_ptr = cstate->attribute_buf.data;
4636 
4637  /* set pointer variables for loop */
4638  cur_ptr = cstate->line_buf.data;
4639  line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4640 
4641  /* Outer loop iterates over fields */
4642  fieldno = 0;
4643  for (;;)
4644  {
4645  bool found_delim = false;
4646  bool saw_quote = false;
4647  char *start_ptr;
4648  char *end_ptr;
4649  int input_len;
4650 
4651  /* Make sure there is enough space for the next value */
4652  if (fieldno >= cstate->max_fields)
4653  {
4654  cstate->max_fields *= 2;
4655  cstate->raw_fields =
4656  repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4657  }
4658 
4659  /* Remember start of field on both input and output sides */
4660  start_ptr = cur_ptr;
4661  cstate->raw_fields[fieldno] = output_ptr;
4662 
4663  /*
4664  * Scan data for field,
4665  *
4666  * The loop starts in "not quote" mode and then toggles between that
4667  * and "in quote" mode. The loop exits normally if it is in "not
4668  * quote" mode and a delimiter or line end is seen.
4669  */
4670  for (;;)
4671  {
4672  char c;
4673 
4674  /* Not in quote */
4675  for (;;)
4676  {
4677  end_ptr = cur_ptr;
4678  if (cur_ptr >= line_end_ptr)
4679  goto endfield;
4680  c = *cur_ptr++;
4681  /* unquoted field delimiter */
4682  if (c == delimc)
4683  {
4684  found_delim = true;
4685  goto endfield;
4686  }
4687  /* start of quoted field (or part of field) */
4688  if (c == quotec)
4689  {
4690  saw_quote = true;
4691  break;
4692  }
4693  /* Add c to output string */
4694  *output_ptr++ = c;
4695  }
4696 
4697  /* In quote */
4698  for (;;)
4699  {
4700  end_ptr = cur_ptr;
4701  if (cur_ptr >= line_end_ptr)
4702  ereport(ERROR,
4703  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4704  errmsg("unterminated CSV quoted field")));
4705 
4706  c = *cur_ptr++;
4707 
4708  /* escape within a quoted field */
4709  if (c == escapec)
4710  {
4711  /*
4712  * peek at the next char if available, and escape it if it
4713  * is an escape char or a quote char
4714  */
4715  if (cur_ptr < line_end_ptr)
4716  {
4717  char nextc = *cur_ptr;
4718 
4719  if (nextc == escapec || nextc == quotec)
4720  {
4721  *output_ptr++ = nextc;
4722  cur_ptr++;
4723  continue;
4724  }
4725  }
4726  }
4727 
4728  /*
4729  * end of quoted field. Must do this test after testing for
4730  * escape in case quote char and escape char are the same
4731  * (which is the common case).
4732  */
4733  if (c == quotec)
4734  break;
4735 
4736  /* Add c to output string */
4737  *output_ptr++ = c;
4738  }
4739  }
4740 endfield:
4741 
4742  /* Terminate attribute value in output area */
4743  *output_ptr++ = '\0';
4744 
4745  /* Check whether raw input matched null marker */
4746  input_len = end_ptr - start_ptr;
4747  if (!saw_quote && input_len == cstate->null_print_len &&
4748  strncmp(start_ptr, cstate->null_print, input_len) == 0)
4749  cstate->raw_fields[fieldno] = NULL;
4750 
4751  fieldno++;
4752  /* Done if we hit EOL instead of a delim */
4753  if (!found_delim)
4754  break;
4755  }
4756 
4757  /* Clean up state of attribute_buf */
4758  output_ptr--;
4759  Assert(*output_ptr == '\0');
4760  cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4761 
4762  return fieldno;
4763 }
4764 
4765 
4766 /*
4767  * Read a binary attribute
4768  */
4769 static Datum
4771  int column_no, FmgrInfo *flinfo,
4772  Oid typioparam, int32 typmod,
4773  bool *isnull)
4774 {
4775  int32 fld_size;
4776  Datum result;
4777 
4778  if (!CopyGetInt32(cstate, &fld_size))
4779  ereport(ERROR,
4780  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4781  errmsg("unexpected EOF in COPY data")));
4782  if (fld_size == -1)
4783  {
4784  *isnull = true;
4785  return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4786  }
4787  if (fld_size < 0)
4788  ereport(ERROR,
4789  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4790  errmsg("invalid field size")));
4791 
4792  /* reset attribute_buf to empty, and load raw data in it */
4793  resetStringInfo(&cstate->attribute_buf);
4794 
4795  enlargeStringInfo(&cstate->attribute_buf, fld_size);
4796  if (CopyGetData(cstate, cstate->attribute_buf.data,
4797  fld_size, fld_size) != fld_size)
4798  ereport(ERROR,
4799  (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4800  errmsg("unexpected EOF in COPY data")));
4801 
4802  cstate->attribute_buf.len = fld_size;
4803  cstate->attribute_buf.data[fld_size] = '\0';
4804 
4805  /* Call the column type's binary input converter */
4806  result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4807  typioparam, typmod);
4808 
4809  /* Trouble if it didn't eat the whole buffer */
4810  if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4811  ereport(ERROR,
4812  (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4813  errmsg("incorrect binary data format")));
4814 
4815  *isnull = false;
4816  return result;
4817 }
4818 
4819 /*
4820  * Send text representation of one attribute, with conversion and escaping
4821  */
4822 #define DUMPSOFAR() \
4823  do { \
4824  if (ptr > start) \
4825  CopySendData(cstate, start, ptr - start); \
4826  } while (0)
4827 
4828 static void
4829 CopyAttributeOutText(CopyState cstate, char *string)
4830 {
4831  char *ptr;
4832  char *start;
4833  char c;
4834  char delimc = cstate->delim[0];
4835 
4836  if (cstate->need_transcoding)
4837  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4838  else
4839  ptr = string;
4840 
4841  /*
4842  * We have to grovel through the string searching for control characters
4843  * and instances of the delimiter character. In most cases, though, these
4844  * are infrequent. To avoid overhead from calling CopySendData once per
4845  * character, we dump out all characters between escaped characters in a
4846  * single call. The loop invariant is that the data from "start" to "ptr"
4847  * can be sent literally, but hasn't yet been.
4848  *
4849  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4850  * in valid backend encodings, extra bytes of a multibyte character never
4851  * look like ASCII. This loop is sufficiently performance-critical that
4852  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4853  * of the normal safe-encoding path.
4854  */
4855  if (cstate->encoding_embeds_ascii)
4856  {
4857  start = ptr;
4858  while ((c = *ptr) != '\0')
4859  {
4860  if ((unsigned char) c < (unsigned char) 0x20)
4861  {
4862  /*
4863  * \r and \n must be escaped, the others are traditional. We
4864  * prefer to dump these using the C-like notation, rather than
4865  * a backslash and the literal character, because it makes the
4866  * dump file a bit more proof against Microsoftish data
4867  * mangling.
4868  */
4869  switch (c)
4870  {
4871  case '\b':
4872  c = 'b';
4873  break;
4874  case '\f':
4875  c = 'f';
4876  break;
4877  case '\n':
4878  c = 'n';
4879  break;
4880  case '\r':
4881  c = 'r';
4882  break;
4883  case '\t':
4884  c = 't';
4885  break;
4886  case '\v':
4887  c = 'v';
4888  break;
4889  default:
4890  /* If it's the delimiter, must backslash it */
4891  if (c == delimc)
4892  break;
4893  /* All ASCII control chars are length 1 */
4894  ptr++;
4895  continue; /* fall to end of loop */
4896  }
4897  /* if we get here, we need to convert the control char */
4898  DUMPSOFAR();
4899  CopySendChar(cstate, '\\');
4900  CopySendChar(cstate, c);
4901  start = ++ptr; /* do not include char in next run */
4902  }
4903  else if (c == '\\' || c == delimc)
4904  {
4905  DUMPSOFAR();
4906  CopySendChar(cstate, '\\');
4907  start = ptr++; /* we include char in next run */
4908  }
4909  else if (IS_HIGHBIT_SET(c))
4910  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4911  else
4912  ptr++;
4913  }
4914  }
4915  else
4916  {
4917  start = ptr;
4918  while ((c = *ptr) != '\0')
4919  {
4920  if ((unsigned char) c < (unsigned char) 0x20)
4921  {
4922  /*
4923  * \r and \n must be escaped, the others are traditional. We
4924  * prefer to dump these using the C-like notation, rather than
4925  * a backslash and the literal character, because it makes the
4926  * dump file a bit more proof against Microsoftish data
4927  * mangling.
4928  */
4929  switch (c)
4930  {
4931  case '\b':
4932  c = 'b';
4933  break;
4934  case '\f':
4935  c = 'f';
4936  break;
4937  case '\n':
4938  c = 'n';
4939  break;
4940  case '\r':
4941  c = 'r';
4942  break;
4943  case '\t':
4944  c = 't';
4945  break;
4946  case '\v':
4947  c = 'v';
4948  break;
4949  default:
4950  /* If it's the delimiter, must backslash it */
4951  if (c == delimc)
4952  break;
4953  /* All ASCII control chars are length 1 */
4954  ptr++;
4955  continue; /* fall to end of loop */
4956  }
4957  /* if we get here, we need to convert the control char */
4958  DUMPSOFAR();
4959  CopySendChar(cstate, '\\');
4960  CopySendChar(cstate, c);
4961  start = ++ptr; /* do not include char in next run */
4962  }
4963  else if (c == '\\' || c == delimc)
4964  {
4965  DUMPSOFAR();
4966  CopySendChar(cstate, '\\');
4967  start = ptr++; /* we include char in next run */
4968  }
4969  else
4970  ptr++;
4971  }
4972  }
4973 
4974  DUMPSOFAR();
4975 }
4976 
4977 /*
4978  * Send text representation of one attribute, with conversion and
4979  * CSV-style escaping
4980  */
4981 static void
4982 CopyAttributeOutCSV(CopyState cstate, char *string,
4983  bool use_quote, bool single_attr)
4984 {
4985  char *ptr;
4986  char *start;
4987  char c;
4988  char delimc = cstate->delim[0];
4989  char quotec = cstate->quote[0];
4990  char escapec = cstate->escape[0];
4991 
4992  /* force quoting if it matches null_print (before conversion!) */
4993  if (!use_quote && strcmp(string, cstate->null_print) == 0)
4994  use_quote = true;
4995 
4996  if (cstate->need_transcoding)
4997  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4998  else
4999  ptr = string;
5000 
5001  /*
5002  * Make a preliminary pass to discover if it needs quoting
5003  */
5004  if (!use_quote)
5005  {
5006  /*
5007  * Because '\.' can be a data value, quote it if it appears alone on a
5008  * line so it is not interpreted as the end-of-data marker.
5009  */
5010  if (single_attr && strcmp(ptr, "\\.") == 0)
5011  use_quote = true;
5012  else
5013  {
5014  char *tptr = ptr;
5015 
5016  while ((c = *tptr) != '\0')
5017  {
5018  if (c == delimc || c == quotec || c == '\n' || c == '\r')
5019  {
5020  use_quote = true;
5021  break;
5022  }
5023  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
5024  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
5025  else
5026  tptr++;
5027  }
5028  }
5029  }
5030 
5031  if (use_quote)
5032  {
5033  CopySendChar(cstate, quotec);
5034 
5035  /*
5036  * We adopt the same optimization strategy as in CopyAttributeOutText
5037  */
5038  start = ptr;
5039  while ((c = *ptr) != '\0')
5040  {
5041  if (c == quotec || c == escapec)
5042  {
5043  DUMPSOFAR();
5044  CopySendChar(cstate, escapec);
5045  start = ptr; /* we include char in next run */
5046  }
5047  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
5048  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
5049  else
5050  ptr++;
5051  }
5052  DUMPSOFAR();
5053 
5054  CopySendChar(cstate, quotec);
5055  }
5056  else
5057  {
5058  /* If it doesn't need quoting, we can just dump it as-is */
5059  CopySendString(cstate, ptr);
5060  }
5061 }
5062 
5063 /*
5064  * CopyGetAttnums - build an integer list of attnums to be copied
5065  *
5066  * The input attnamelist is either the user-specified column list,
5067  * or NIL if there was none (in which case we want all the non-dropped
5068  * columns).
5069  *
5070  * We don't include generated columns in the generated full list and we don't
5071  * allow them to be specified explicitly. They don't make sense for COPY
5072  * FROM, but we could possibly allow them for COPY TO. But this way it's at
5073  * least ensured that whatever we copy out can be copied back in.
5074  *
5075  * rel can be NULL ... it's only used for error reports.
5076  */
5077 static List *
5078 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
5079 {
5080  List *attnums = NIL;
5081 
5082  if (attnamelist == NIL)
5083  {
5084  /* Generate default column list */
5085  int attr_count = tupDesc->natts;
5086  int i;
5087 
5088  for (i = 0; i < attr_count; i++)
5089  {
5090  if (TupleDescAttr(tupDesc, i)->attisdropped)
5091  continue;
5092  if (TupleDescAttr(tupDesc, i)->attgenerated)
5093  continue;
5094  attnums = lappend_int(attnums, i + 1);
5095  }
5096  }
5097  else
5098  {
5099  /* Validate the user-supplied list and extract attnums */
5100  ListCell *l;
5101 
5102  foreach(l, attnamelist)
5103  {
5104  char *name = strVal(lfirst(l));
5105  int attnum;
5106  int i;
5107 
5108  /* Lookup column name */
5109  attnum = InvalidAttrNumber;
5110  for (i = 0; i < tupDesc->natts; i++)
5111  {
5112  Form_pg_attribute att = TupleDescAttr(tupDesc, i);
5113 
5114  if (att->attisdropped)
5115  continue;
5116  if (namestrcmp(&(att->attname), name) == 0)
5117  {
5118  if (att->attgenerated)
5119  ereport(ERROR,
5120  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
5121  errmsg("column \"%s\" is a generated column",
5122  name),
5123  errdetail("Generated columns cannot be used in COPY.")));
5124  attnum = att->attnum;
5125  break;
5126  }
5127  }
5128  if (attnum == InvalidAttrNumber)
5129  {
5130  if (rel != NULL)
5131  ereport(ERROR,
5132  (errcode(ERRCODE_UNDEFINED_COLUMN),
5133  errmsg("column \"%s\" of relation \"%s\" does not exist",
5134  name, RelationGetRelationName(rel))));
5135  else
5136  ereport(ERROR,
5137  (errcode(ERRCODE_UNDEFINED_COLUMN),
5138  errmsg("column \"%s\" does not exist",
5139  name)));
5140  }
5141  /* Check for duplicates */
5142  if (list_member_int(attnums, attnum))
5143  ereport(ERROR,
5144  (errcode(ERRCODE_DUPLICATE_COLUMN),
5145  errmsg("column \"%s\" specified more than once",
5146  name)));
5147  attnums = lappend_int(attnums, attnum);
5148  }
5149  }
5150 
5151  return attnums;
5152 }
5153 
5154 
5155 /*
5156  * copy_dest_startup --- executor startup
5157  */
5158 static void
5159 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
5160 {
5161  /* no-op */
5162 }
5163 
5164 /*
5165  * copy_dest_receive --- receive one tuple
5166  */
5167 static bool
5169 {
5170  DR_copy *myState = (DR_copy *) self;
5171  CopyState cstate = myState->cstate;
5172 
5173  /* Send the data */
5174  CopyOneRowTo(cstate, slot);
5175  myState->processed++;
5176 
5177  return true;
5178 }
5179 
5180 /*
5181  * copy_dest_shutdown --- executor end
5182  */
5183 static void
5185 {
5186  /* no-op */
5187 }
5188 
5189 /*
5190  * copy_dest_destroy --- release DestReceiver object
5191  */
5192 static void
5194 {
5195  pfree(self);
5196 }
5197 
5198 /*
5199  * CreateCopyDestReceiver -- create a suitable DestReceiver object
5200  */
5201 DestReceiver *
5203 {
5204  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
5205 
5206  self->pub.receiveSlot = copy_dest_receive;
5207  self->pub.rStartup = copy_dest_startup;
5208  self->pub.rShutdown = copy_dest_shutdown;
5209  self->pub.rDestroy = copy_dest_destroy;
5210  self->pub.mydest = DestCopyOut;
5211 
5212  self->cstate = NULL; /* will be set later */
5213  self->processed = 0;
5214 
5215  return (DestReceiver *) self;
5216 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
struct CopyMultiInsertBuffer CopyMultiInsertBuffer
AttrNumber * attrMap
Definition: tupconvert.h:26
signed short int16
Definition: c.h:345
List * indirection
Definition: parsenodes.h:441
bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
Definition: copy.c:3636
int ri_NumIndices
Definition: execnodes.h:414
Node * whereClause
Definition: copy.c:156
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:476
#define NIL
Definition: pg_list.h:65
Node * whereClause
Definition: parsenodes.h:1996
Oid tts_tableOid
Definition: tuptable.h:131
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
uint32 CommandId
Definition: c.h:521
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:724
static Datum CopyReadBinaryAttribute(CopyState cstate, int column_no, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
Definition: copy.c:4770
static int GetDecimalFromHex(char hex)
Definition: copy.c:4345
Definition: fmgr.h:56
#define MAX_COPY_DATA_DISPLAY
bool csv_mode
Definition: copy.c:138
static void SendCopyEnd(CopyState cstate)
Definition: copy.c:471
void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, Relation partition_root, int instrument_options)
Definition: execMain.c:1277
Relation ri_RelationDesc
Definition: execnodes.h:411
List * range_table
Definition: copy.c:184
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel, RawStmt *raw_query, Oid queryRelId, List *attnamelist, List *options)
Definition: copy.c:1461
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:783
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
static bool CopyReadLineText(CopyState cstate)
Definition: copy.c:3981
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:124
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copy.c:2635
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:774
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
Node * val
Definition: parsenodes.h:442
static bool CopyGetInt32(CopyState cstate, int32 *val)
Definition: copy.c:739
int errhint(const char *fmt,...)
Definition: elog.c:974
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2602
EState * estate
Definition: copy.c:279
int pg_char_to_encoding(const char *name)
Definition: encnames.c:551
char ** raw_fields
Definition: copy.c:202
Definition: copy.c:83
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2674
#define VARDATA(PTR)
Definition: postgres.h:302
static void EndCopy(CopyState cstate)
Definition: copy.c:1802
bool binary
Definition: copy.c:136
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:488
#define pq_flush()
Definition: libpq.h:39
void CopyFromErrorCallback(void *arg)
Definition: copy.c:2239
void PreventCommandIfParallelMode(const char *cmdname)
Definition: utility.c:257
List * attlist
Definition: parsenodes.h:1990
List * fromClause
Definition: parsenodes.h:1575
#define ISOCTAL(c)
Definition: copy.c:62
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
#define OCTVALUE(c)
Definition: copy.c:63
#define ResetPerTupleExprContext(estate)
Definition: executor.h:510
#define RelationGetDescr(relation)
Definition: rel.h:445
int LOCKMODE
Definition: lockdefs.h:26
char * name
Definition: parsenodes.h:440
Oid GetUserId(void)
Definition: miscinit.c:380
#define TABLE_INSERT_FROZEN
Definition: tableam.h:132
bool need_transcoding
Definition: copy.c:126
#define castNode(_type_, nodeptr)
Definition: nodes.h:594
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
FmgrInfo * in_functions
Definition: copy.c:179
AttrNumber num_defaults
Definition: copy.c:178
List * attnumlist
Definition: copy.c:132
#define VARSIZE(PTR)
Definition: postgres.h:303
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:581
char * filename
Definition: copy.c:133
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:215
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1169
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
struct CopyMultiInsertInfo CopyMultiInsertInfo
CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copy.c:3387
#define VARHDRSZ
Definition: c.h:555
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:211
struct PartitionRoutingInfo * ri_PartitionInfo
Definition: execnodes.h:485
List * relationOids
Definition: plannodes.h:84
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define pg_hton16(x)
Definition: pg_bswap.h:120
MemoryContext ecxt_per_tuple_memory
Definition: execnodes.h:232
static void ReceiveCopyBegin(CopyState cstate)
Definition: copy.c:435
#define XLogIsNeeded()
Definition: xlog.h:181
#define pg_ntoh16(x)
Definition: pg_bswap.h:124
Definition: copy.c:229
StringInfo makeStringInfo(void)
Definition: stringinfo.c:28
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copy.c:2366
#define MAX_BUFFERED_TUPLES
Definition: copy.c:246
bool rd_islocaltemp
Definition: rel.h:59
CommandId mycid
Definition: copy.c:280
#define S_IWOTH
Definition: win32_port.h:287
Expr * expression_planner(Expr *expr)
Definition: planner.c:6046
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:143
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copy.c:264
Node * transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind)
Definition: parse_expr.c:145
DestReceiver pub
Definition: copy.c:231
void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options)
Definition: copy.c:1110
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1905
StringInfoData line_buf
Definition: copy.c:211
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define IF_NEED_REFILL_AND_EOF_BREAK(extralen)
Definition: copy.c:313
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:841
#define AccessShareLock
Definition: lockdefs.h:36
static struct @145 value
struct CopyStateData CopyStateData
Definition: nodes.h:525
#define strVal(v)
Definition: value.h:54
struct cursor * cur
Definition: ecpg.c:28
int raw_buf_index
Definition: copy.c:224
static void CopyAttributeOutText(CopyState cstate, char *string)
Definition: copy.c:4829
bool line_buf_valid
Definition: copy.c:213
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1689
CopyState cstate
Definition: copy.c:232
int errcode(int sqlerrcode)
Definition: elog.c:570
#define PG_BINARY_W
Definition: c.h:1194
int namestrcmp(Name name, const char *str)
Definition: name.c:287
#define MemSet(start, val, len)
Definition: c.h:955
TupleTableSlot * execute_attr_map_slot(AttrNumber *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:425
Node * eval_const_expressions(PlannerInfo *root, Node *node)
Definition: clauses.c:2253
uint64 CopyFrom(CopyState cstate)
Definition: copy.c:2658
CmdType operation
Definition: execnodes.h:1161
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:80
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1318
Datum * tts_values
Definition: tuptable.h:126
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static void ClosePipeToProgram(CopyState cstate)
Definition: copy.c:1767
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
static int CopyReadAttributesCSV(CopyState cstate)
Definition: copy.c:4601
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
AclMode requiredPerms
Definition: parsenodes.h:1096
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:724
#define SIGPIPE
Definition: win32_port.h:159
EState * state
Definition: execnodes.h:942
bool * force_quote_flags
Definition: copy.c:148
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:872
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
List * pg_analyze_and_rewrite(RawStmt *parsetree, const char *query_string, Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)
Definition: postgres.c:680
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:365
char * delim
Definition: copy.c:143
#define PG_PROTOCOL_MAJOR(v)
Definition: pqcomm.h:104
Node * utilityStmt
Definition: parsenodes.h:120
bool is_program
Definition: parsenodes.h:1993
#define linitial_node(type, l)
Definition: pg_list.h:198
bool volatile_defexprs
Definition: copy.c:183
void(* callback)(void *arg)
Definition: elog.h:254
struct ErrorContextCallback * previous
Definition: elog.h:253
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copy.c:2386
#define PG_BINARY_R
Definition: c.h:1193
ResultRelInfo * resultRelInfo
Definition: copy.c:261
static void copy_dest_destroy(DestReceiver *self)
Definition: copy.c:5193
bool * force_null_flags
Definition: copy.c:152
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
MemoryContext rowcontext
Definition: copy.c:173
bool line_buf_converted
Definition: copy.c:212
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1123
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:207
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:654
signed int int32
Definition: c.h:346
int ClosePipeStream(FILE *file)
Definition: fd.c:2614
int errdetail_internal(const char *fmt,...)
Definition: elog.c:887
bool * convert_select_flags
Definition: copy.c:155
static void CopySendChar(CopyState cstate, char c)
Definition: copy.c:512
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1575
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:151
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot)
int location
Definition: parsenodes.h:236
CopyDest copy_dest
Definition: copy.c:117
int location
Definition: parsenodes.h:443
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
#define REFILL_LINEBUF
Definition: copy.c:330
ErrorContextCallback * error_context_stack
Definition: elog.c:88
uint64 cur_lineno
Definition: copy.c:160
#define list_make1(x1)
Definition: pg_list.h:227
char * null_print
Definition: copy.c:140
const char * cur_attname
Definition: copy.c:161
void assign_expr_collations(ParseState *pstate, Node *expr)
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:462