PostgreSQL Source Code  git master
copy.h File Reference
#include "libpq-fe.h"
Include dependency graph for copy.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

bool do_copy (const char *args)
 
bool handleCopyOut (PGconn *conn, FILE *copystream, PGresult **res)
 
bool handleCopyIn (PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 

Function Documentation

◆ do_copy()

bool do_copy ( const char *  args)

Definition at line 268 of file copy.c.

References copy_options::after_tofrom, appendPQExpBufferStr(), copy_options::before_tofrom, canonicalize_path(), _psqlSettings::copyStream, _psqlSettings::cur_cmd_source, PQExpBufferData::data, disable_sigpipe_trap(), copy_options::file, free, free_copy_options(), copy_options::from, fstat, initPQExpBuffer(), options, parse_slash_copy(), PG_BINARY_R, PG_BINARY_W, pg_log_error, printfPQExpBuffer(), copy_options::program, pset, copy_options::psql_inout, _psqlSettings::queryFout, restore_sigpipe_trap(), S_ISDIR, SendQuery(), stat::st_mode, generate_unaccent_rules::stdout, success, termPQExpBuffer(), and wait_result_to_str().

Referenced by exec_command_copy().

269 {
270  PQExpBufferData query;
271  FILE *copystream;
272  struct copy_options *options;
273  bool success;
274 
275  /* parse options */
276  options = parse_slash_copy(args);
277 
278  if (!options)
279  return false;
280 
281  /* prepare to read or write the target file */
282  if (options->file && !options->program)
283  canonicalize_path(options->file);
284 
285  if (options->from)
286  {
287  if (options->file)
288  {
289  if (options->program)
290  {
291  fflush(stdout);
292  fflush(stderr);
293  errno = 0;
294  copystream = popen(options->file, PG_BINARY_R);
295  }
296  else
297  copystream = fopen(options->file, PG_BINARY_R);
298  }
299  else if (!options->psql_inout)
300  copystream = pset.cur_cmd_source;
301  else
302  copystream = stdin;
303  }
304  else
305  {
306  if (options->file)
307  {
308  if (options->program)
309  {
310  fflush(stdout);
311  fflush(stderr);
312  errno = 0;
314  copystream = popen(options->file, PG_BINARY_W);
315  }
316  else
317  copystream = fopen(options->file, PG_BINARY_W);
318  }
319  else if (!options->psql_inout)
320  copystream = pset.queryFout;
321  else
322  copystream = stdout;
323  }
324 
325  if (!copystream)
326  {
327  if (options->program)
328  pg_log_error("could not execute command \"%s\": %m",
329  options->file);
330  else
331  pg_log_error("%s: %m",
332  options->file);
333  free_copy_options(options);
334  return false;
335  }
336 
337  if (!options->program)
338  {
339  struct stat st;
340  int result;
341 
342  /* make sure the specified file is not a directory */
343  if ((result = fstat(fileno(copystream), &st)) < 0)
344  pg_log_error("could not stat file \"%s\": %m",
345  options->file);
346 
347  if (result == 0 && S_ISDIR(st.st_mode))
348  pg_log_error("%s: cannot copy from/to a directory",
349  options->file);
350 
351  if (result < 0 || S_ISDIR(st.st_mode))
352  {
353  fclose(copystream);
354  free_copy_options(options);
355  return false;
356  }
357  }
358 
359  /* build the command we will send to the backend */
360  initPQExpBuffer(&query);
361  printfPQExpBuffer(&query, "COPY ");
362  appendPQExpBufferStr(&query, options->before_tofrom);
363  if (options->from)
364  appendPQExpBufferStr(&query, " FROM STDIN ");
365  else
366  appendPQExpBufferStr(&query, " TO STDOUT ");
367  if (options->after_tofrom)
368  appendPQExpBufferStr(&query, options->after_tofrom);
369 
370  /* run it like a user command, but with copystream as data source/sink */
371  pset.copyStream = copystream;
372  success = SendQuery(query.data);
373  pset.copyStream = NULL;
374  termPQExpBuffer(&query);
375 
376  if (options->file != NULL)
377  {
378  if (options->program)
379  {
380  int pclose_rc = pclose(copystream);
381 
382  if (pclose_rc != 0)
383  {
384  if (pclose_rc < 0)
385  pg_log_error("could not close pipe to external command: %m");
386  else
387  {
388  char *reason = wait_result_to_str(pclose_rc);
389 
390  pg_log_error("%s: %s", options->file,
391  reason ? reason : "");
392  if (reason)
393  free(reason);
394  }
395  success = false;
396  }
398  }
399  else
400  {
401  if (fclose(copystream) != 0)
402  {
403  pg_log_error("%s: %m", options->file);
404  success = false;
405  }
406  }
407  }
408  free_copy_options(options);
409  return success;
410 }
void printfPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:237
PsqlSettings pset
Definition: startup.c:32
void disable_sigpipe_trap(void)
Definition: print.c:2925
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
#define pg_log_error(...)
Definition: logging.h:80
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:369
static void free_copy_options(struct copy_options *ptr)
Definition: copy.c:65
FILE * queryFout
Definition: settings.h:84
void canonicalize_path(char *path)
Definition: path.c:254
#define PG_BINARY_W
Definition: c.h:1274
#define PG_BINARY_R
Definition: c.h:1273
FILE * copyStream
Definition: settings.h:87
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
static struct copy_options * parse_slash_copy(const char *args)
Definition: copy.c:89
FILE * cur_cmd_source
Definition: settings.h:105
char * before_tofrom
Definition: copy.c:55
bool psql_inout
Definition: copy.c:59
#define fstat
Definition: win32_port.h:274
bool program
Definition: copy.c:58
void restore_sigpipe_trap(void)
Definition: print.c:2948
bool from
Definition: copy.c:60
static char ** options
#define free(a)
Definition: header.h:65
char * after_tofrom
Definition: copy.c:56
#define S_ISDIR(m)
Definition: win32_port.h:316
bool SendQuery(const char *query)
Definition: common.c:1193
static bool success
Definition: initdb.c:165
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
char * file
Definition: copy.c:57

◆ handleCopyIn()

bool handleCopyIn ( PGconn conn,
FILE *  copystream,
bool  isbinary,
PGresult **  res 
)

Definition at line 513 of file copy.c.

References _, buf, COPYBUFSIZ, _psqlSettings::cur_cmd_source, get_prompt(), _psqlSettings::lineno, pg_log_info, PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQerrorMessage(), PQgetResult(), PQprotocolVersion(), PQputCopyData(), PQputCopyEnd(), PQresultStatus(), PROMPT_COPY, pset, _psqlSettings::quiet, sigint_interrupt_enabled, sigint_interrupt_jmp, generate_unaccent_rules::stdout, and _psqlSettings::stmt_lineno.

Referenced by ProcessResult().

514 {
515  bool OK;
516  char buf[COPYBUFSIZ];
517  bool showprompt;
518 
519  /*
520  * Establish longjmp destination for exiting from wait-for-input. (This is
521  * only effective while sigint_interrupt_enabled is TRUE.)
522  */
523  if (sigsetjmp(sigint_interrupt_jmp, 1) != 0)
524  {
525  /* got here with longjmp */
526 
527  /* Terminate data transfer */
528  PQputCopyEnd(conn,
529  (PQprotocolVersion(conn) < 3) ? NULL :
530  _("canceled by user"));
531 
532  OK = false;
533  goto copyin_cleanup;
534  }
535 
536  /* Prompt if interactive input */
537  if (isatty(fileno(copystream)))
538  {
539  showprompt = true;
540  if (!pset.quiet)
541  puts(_("Enter data to be copied followed by a newline.\n"
542  "End with a backslash and a period on a line by itself, or an EOF signal."));
543  }
544  else
545  showprompt = false;
546 
547  OK = true;
548 
549  if (isbinary)
550  {
551  /* interactive input probably silly, but give one prompt anyway */
552  if (showprompt)
553  {
554  const char *prompt = get_prompt(PROMPT_COPY, NULL);
555 
556  fputs(prompt, stdout);
557  fflush(stdout);
558  }
559 
560  for (;;)
561  {
562  int buflen;
563 
564  /* enable longjmp while waiting for input */
566 
567  buflen = fread(buf, 1, COPYBUFSIZ, copystream);
568 
569  sigint_interrupt_enabled = false;
570 
571  if (buflen <= 0)
572  break;
573 
574  if (PQputCopyData(conn, buf, buflen) <= 0)
575  {
576  OK = false;
577  break;
578  }
579  }
580  }
581  else
582  {
583  bool copydone = false;
584  int buflen;
585  bool at_line_begin = true;
586 
587  /*
588  * In text mode, we have to read the input one line at a time, so that
589  * we can stop reading at the EOF marker (\.). We mustn't read beyond
590  * the EOF marker, because if the data was inlined in a SQL script, we
591  * would eat up the commands after the EOF marker.
592  */
593  buflen = 0;
594  while (!copydone)
595  {
596  char *fgresult;
597 
598  if (at_line_begin && showprompt)
599  {
600  const char *prompt = get_prompt(PROMPT_COPY, NULL);
601 
602  fputs(prompt, stdout);
603  fflush(stdout);
604  }
605 
606  /* enable longjmp while waiting for input */
608 
609  fgresult = fgets(&buf[buflen], COPYBUFSIZ - buflen, copystream);
610 
611  sigint_interrupt_enabled = false;
612 
613  if (!fgresult)
614  copydone = true;
615  else
616  {
617  int linelen;
618 
619  linelen = strlen(fgresult);
620  buflen += linelen;
621 
622  /* current line is done? */
623  if (buf[buflen - 1] == '\n')
624  {
625  /* check for EOF marker, but not on a partial line */
626  if (at_line_begin)
627  {
628  /*
629  * This code erroneously assumes '\.' on a line alone
630  * inside a quoted CSV string terminates the \copy.
631  * https://www.postgresql.org/message-id/E1TdNVQ-0001ju-GO@wrigleys.postgresql.org
632  */
633  if ((linelen == 3 && memcmp(fgresult, "\\.\n", 3) == 0) ||
634  (linelen == 4 && memcmp(fgresult, "\\.\r\n", 4) == 0))
635  {
636  copydone = true;
637  }
638  }
639 
640  if (copystream == pset.cur_cmd_source)
641  {
642  pset.lineno++;
643  pset.stmt_lineno++;
644  }
645  at_line_begin = true;
646  }
647  else
648  at_line_begin = false;
649  }
650 
651  /*
652  * If the buffer is full, or we've reached the EOF, flush it.
653  *
654  * Make sure there's always space for four more bytes in the
655  * buffer, plus a NUL terminator. That way, an EOF marker is
656  * never split across two fgets() calls, which simplies the logic.
657  */
658  if (buflen >= COPYBUFSIZ - 5 || (copydone && buflen > 0))
659  {
660  if (PQputCopyData(conn, buf, buflen) <= 0)
661  {
662  OK = false;
663  copydone = true;
664  break;
665  }
666 
667  buflen = 0;
668  }
669  }
670  }
671 
672  /* Check for read error */
673  if (ferror(copystream))
674  OK = false;
675 
676  /*
677  * Terminate data transfer. We can't send an error message if we're using
678  * protocol version 2. (libpq no longer supports protocol version 2, but
679  * keep the version checks just in case you're using a pre-v14 libpq.so at
680  * runtime)
681  */
682  if (PQputCopyEnd(conn,
683  (OK || PQprotocolVersion(conn) < 3) ? NULL :
684  _("aborted because of read failure")) <= 0)
685  OK = false;
686 
687 copyin_cleanup:
688 
689  /*
690  * Clear the EOF flag on the stream, in case copying ended due to an EOF
691  * signal. This allows an interactive TTY session to perform another COPY
692  * FROM STDIN later. (In non-STDIN cases, we're about to close the file
693  * anyway, so it doesn't matter.) Although we don't ever test the flag
694  * with feof(), some fread() implementations won't read more data if it's
695  * set. This also clears the error flag, but we already checked that.
696  */
697  clearerr(copystream);
698 
699  /*
700  * Check command status and return to normal libpq state.
701  *
702  * We do not want to return with the status still PGRES_COPY_IN: our
703  * caller would be unable to distinguish that situation from reaching the
704  * next COPY in a command string that happened to contain two consecutive
705  * COPY FROM STDIN commands. We keep trying PQputCopyEnd() in the hope
706  * it'll work eventually. (What's actually likely to happen is that in
707  * attempting to flush the data, libpq will eventually realize that the
708  * connection is lost. But that's fine; it will get us out of COPY_IN
709  * state, which is what we need.)
710  */
711  while (*res = PQgetResult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
712  {
713  OK = false;
714  PQclear(*res);
715  /* We can't send an error message if we're using protocol version 2 */
716  PQputCopyEnd(conn,
717  (PQprotocolVersion(conn) < 3) ? NULL :
718  _("trying to exit copy mode"));
719  }
720  if (PQresultStatus(*res) != PGRES_COMMAND_OK)
721  {
722  pg_log_info("%s", PQerrorMessage(conn));
723  OK = false;
724  }
725 
726  return OK;
727 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2544
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
volatile bool sigint_interrupt_enabled
Definition: common.c:245
PsqlSettings pset
Definition: startup.c:32
char * get_prompt(promptStatus_t status, ConditionalStack cstack)
Definition: prompt.c:66
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2600
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
FILE * cur_cmd_source
Definition: settings.h:105
sigjmp_buf sigint_interrupt_jmp
Definition: common.c:247
#define COPYBUFSIZ
Definition: copy.c:510
static char * buf
Definition: pg_test_fsync.c:68
void PQclear(PGresult *res)
Definition: fe-exec.c:694
uint64 lineno
Definition: settings.h:111
int PQprotocolVersion(const PGconn *conn)
Definition: fe-connect.c:6724
#define _(x)
Definition: elog.c:89
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
uint64 stmt_lineno
Definition: settings.h:112
#define pg_log_info(...)
Definition: logging.h:88

◆ handleCopyOut()

bool handleCopyOut ( PGconn conn,
FILE *  copystream,
PGresult **  res 
)

Definition at line 436 of file copy.c.

References buf, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQgetResult(), and PQresultStatus().

Referenced by ProcessResult().

437 {
438  bool OK = true;
439  char *buf;
440  int ret;
441 
442  for (;;)
443  {
444  ret = PQgetCopyData(conn, &buf, 0);
445 
446  if (ret < 0)
447  break; /* done or server/connection error */
448 
449  if (buf)
450  {
451  if (OK && copystream && fwrite(buf, 1, ret, copystream) != ret)
452  {
453  pg_log_error("could not write COPY data: %m");
454  /* complain only once, keep reading data from server */
455  OK = false;
456  }
457  PQfreemem(buf);
458  }
459  }
460 
461  if (OK && copystream && fflush(copystream))
462  {
463  pg_log_error("could not write COPY data: %m");
464  OK = false;
465  }
466 
467  if (ret == -2)
468  {
469  pg_log_error("COPY data transfer failed: %s", PQerrorMessage(conn));
470  OK = false;
471  }
472 
473  /*
474  * Check command status and return to normal libpq state.
475  *
476  * If for some reason libpq is still reporting PGRES_COPY_OUT state, we
477  * would like to forcibly exit that state, since our caller would be
478  * unable to distinguish that situation from reaching the next COPY in a
479  * command string that happened to contain two consecutive COPY TO STDOUT
480  * commands. However, libpq provides no API for doing that, and in
481  * principle it's a libpq bug anyway if PQgetCopyData() returns -1 or -2
482  * but hasn't exited COPY_OUT state internally. So we ignore the
483  * possibility here.
484  */
485  *res = PQgetResult(conn);
486  if (PQresultStatus(*res) != PGRES_COMMAND_OK)
487  {
488  pg_log_info("%s", PQerrorMessage(conn));
489  OK = false;
490  }
491 
492  return OK;
493 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_log_error(...)
Definition: logging.h:80
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2668
static char * buf
Definition: pg_test_fsync.c:68
void PQfreemem(void *ptr)
Definition: fe-exec.c:3796
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1978
#define pg_log_info(...)
Definition: logging.h:88