PostgreSQL Source Code  git master
copy.h File Reference
#include "libpq-fe.h"
Include dependency graph for copy.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

bool do_copy (const char *args)
 
bool handleCopyOut (PGconn *conn, FILE *copystream, PGresult **res)
 
bool handleCopyIn (PGconn *conn, FILE *copystream, bool isbinary, PGresult **res)
 

Function Documentation

◆ do_copy()

bool do_copy ( const char *  args)

Definition at line 268 of file copy.c.

References copy_options::after_tofrom, appendPQExpBufferStr(), copy_options::before_tofrom, canonicalize_path(), _psqlSettings::copyStream, _psqlSettings::cur_cmd_source, PQExpBufferData::data, disable_sigpipe_trap(), copy_options::file, free, free_copy_options(), copy_options::from, initPQExpBuffer(), options, parse_slash_copy(), PG_BINARY_R, PG_BINARY_W, pg_log_error, printfPQExpBuffer(), copy_options::program, pset, copy_options::psql_inout, _psqlSettings::queryFout, restore_sigpipe_trap(), S_ISDIR, SendQuery(), stat, generate_unaccent_rules::stdout, success, termPQExpBuffer(), and wait_result_to_str().

Referenced by exec_command_copy().

269 {
270  PQExpBufferData query;
271  FILE *copystream;
272  struct copy_options *options;
273  bool success;
274 
275  /* parse options */
276  options = parse_slash_copy(args);
277 
278  if (!options)
279  return false;
280 
281  /* prepare to read or write the target file */
282  if (options->file && !options->program)
283  canonicalize_path(options->file);
284 
285  if (options->from)
286  {
287  if (options->file)
288  {
289  if (options->program)
290  {
291  fflush(stdout);
292  fflush(stderr);
293  errno = 0;
294  copystream = popen(options->file, PG_BINARY_R);
295  }
296  else
297  copystream = fopen(options->file, PG_BINARY_R);
298  }
299  else if (!options->psql_inout)
300  copystream = pset.cur_cmd_source;
301  else
302  copystream = stdin;
303  }
304  else
305  {
306  if (options->file)
307  {
308  if (options->program)
309  {
310  fflush(stdout);
311  fflush(stderr);
312  errno = 0;
314  copystream = popen(options->file, PG_BINARY_W);
315  }
316  else
317  copystream = fopen(options->file, PG_BINARY_W);
318  }
319  else if (!options->psql_inout)
320  copystream = pset.queryFout;
321  else
322  copystream = stdout;
323  }
324 
325  if (!copystream)
326  {
327  if (options->program)
328  pg_log_error("could not execute command \"%s\": %m",
329  options->file);
330  else
331  pg_log_error("%s: %m",
332  options->file);
333  free_copy_options(options);
334  return false;
335  }
336 
337  if (!options->program)
338  {
339  struct stat st;
340  int result;
341 
342  /* make sure the specified file is not a directory */
343  if ((result = fstat(fileno(copystream), &st)) < 0)
344  pg_log_error("could not stat file \"%s\": %m",
345  options->file);
346 
347  if (result == 0 && S_ISDIR(st.st_mode))
348  pg_log_error("%s: cannot copy from/to a directory",
349  options->file);
350 
351  if (result < 0 || S_ISDIR(st.st_mode))
352  {
353  fclose(copystream);
354  free_copy_options(options);
355  return false;
356  }
357  }
358 
359  /* build the command we will send to the backend */
360  initPQExpBuffer(&query);
361  printfPQExpBuffer(&query, "COPY ");
362  appendPQExpBufferStr(&query, options->before_tofrom);
363  if (options->from)
364  appendPQExpBufferStr(&query, " FROM STDIN ");
365  else
366  appendPQExpBufferStr(&query, " TO STDOUT ");
367  if (options->after_tofrom)
368  appendPQExpBufferStr(&query, options->after_tofrom);
369 
370  /* run it like a user command, but with copystream as data source/sink */
371  pset.copyStream = copystream;
372  success = SendQuery(query.data);
373  pset.copyStream = NULL;
374  termPQExpBuffer(&query);
375 
376  if (options->file != NULL)
377  {
378  if (options->program)
379  {
380  int pclose_rc = pclose(copystream);
381 
382  if (pclose_rc != 0)
383  {
384  if (pclose_rc < 0)
385  pg_log_error("could not close pipe to external command: %m");
386  else
387  {
388  char *reason = wait_result_to_str(pclose_rc);
389 
390  pg_log_error("%s: %s", options->file,
391  reason ? reason : "");
392  if (reason)
393  free(reason);
394  }
395  success = false;
396  }
398  }
399  else
400  {
401  if (fclose(copystream) != 0)
402  {
403  pg_log_error("%s: %m", options->file);
404  success = false;
405  }
406  }
407  }
408  free_copy_options(options);
409  return success;
410 }
void printfPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:237
PsqlSettings pset
Definition: startup.c:31
void disable_sigpipe_trap(void)
Definition: print.c:2942
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:131
#define pg_log_error(...)
Definition: logging.h:79
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:369
static void free_copy_options(struct copy_options *ptr)
Definition: copy.c:65
FILE * queryFout
Definition: settings.h:84
void canonicalize_path(char *path)
Definition: path.c:254
#define PG_BINARY_W
Definition: c.h:1225
#define PG_BINARY_R
Definition: c.h:1224
FILE * copyStream
Definition: settings.h:87
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
static struct copy_options * parse_slash_copy(const char *args)
Definition: copy.c:89
FILE * cur_cmd_source
Definition: settings.h:104
char * before_tofrom
Definition: copy.c:55
bool psql_inout
Definition: copy.c:59
bool program
Definition: copy.c:58
void restore_sigpipe_trap(void)
Definition: print.c:2965
bool from
Definition: copy.c:60
static char ** options
#define stat(a, b)
Definition: win32_port.h:255
#define free(a)
Definition: header.h:65
char * after_tofrom
Definition: copy.c:56
#define S_ISDIR(m)
Definition: win32_port.h:296
bool SendQuery(const char *query)
Definition: common.c:1181
static bool success
Definition: initdb.c:163
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:92
char * file
Definition: copy.c:57

◆ handleCopyIn()

bool handleCopyIn ( PGconn conn,
FILE *  copystream,
bool  isbinary,
PGresult **  res 
)

Definition at line 513 of file copy.c.

References _, buf, COPYBUFSIZ, _psqlSettings::cur_cmd_source, get_prompt(), _psqlSettings::lineno, pg_log_info, PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQerrorMessage(), PQgetResult(), PQprotocolVersion(), PQputCopyData(), PQputCopyEnd(), PQresultStatus(), PROMPT_COPY, pset, _psqlSettings::quiet, sigint_interrupt_enabled, sigint_interrupt_jmp, generate_unaccent_rules::stdout, and _psqlSettings::stmt_lineno.

Referenced by ProcessResult().

514 {
515  bool OK;
516  char buf[COPYBUFSIZ];
517  bool showprompt;
518 
519  /*
520  * Establish longjmp destination for exiting from wait-for-input. (This is
521  * only effective while sigint_interrupt_enabled is TRUE.)
522  */
523  if (sigsetjmp(sigint_interrupt_jmp, 1) != 0)
524  {
525  /* got here with longjmp */
526 
527  /* Terminate data transfer */
528  PQputCopyEnd(conn,
529  (PQprotocolVersion(conn) < 3) ? NULL :
530  _("canceled by user"));
531 
532  OK = false;
533  goto copyin_cleanup;
534  }
535 
536  /* Prompt if interactive input */
537  if (isatty(fileno(copystream)))
538  {
539  showprompt = true;
540  if (!pset.quiet)
541  puts(_("Enter data to be copied followed by a newline.\n"
542  "End with a backslash and a period on a line by itself, or an EOF signal."));
543  }
544  else
545  showprompt = false;
546 
547  OK = true;
548 
549  if (isbinary)
550  {
551  /* interactive input probably silly, but give one prompt anyway */
552  if (showprompt)
553  {
554  const char *prompt = get_prompt(PROMPT_COPY, NULL);
555 
556  fputs(prompt, stdout);
557  fflush(stdout);
558  }
559 
560  for (;;)
561  {
562  int buflen;
563 
564  /* enable longjmp while waiting for input */
566 
567  buflen = fread(buf, 1, COPYBUFSIZ, copystream);
568 
569  sigint_interrupt_enabled = false;
570 
571  if (buflen <= 0)
572  break;
573 
574  if (PQputCopyData(conn, buf, buflen) <= 0)
575  {
576  OK = false;
577  break;
578  }
579  }
580  }
581  else
582  {
583  bool copydone = false;
584 
585  while (!copydone)
586  { /* for each input line ... */
587  bool firstload;
588  bool linedone;
589 
590  if (showprompt)
591  {
592  const char *prompt = get_prompt(PROMPT_COPY, NULL);
593 
594  fputs(prompt, stdout);
595  fflush(stdout);
596  }
597 
598  firstload = true;
599  linedone = false;
600 
601  while (!linedone)
602  { /* for each bufferload in line ... */
603  int linelen;
604  char *fgresult;
605 
606  /* enable longjmp while waiting for input */
608 
609  fgresult = fgets(buf, sizeof(buf), copystream);
610 
611  sigint_interrupt_enabled = false;
612 
613  if (!fgresult)
614  {
615  copydone = true;
616  break;
617  }
618 
619  linelen = strlen(buf);
620 
621  /* current line is done? */
622  if (linelen > 0 && buf[linelen - 1] == '\n')
623  linedone = true;
624 
625  /* check for EOF marker, but not on a partial line */
626  if (firstload)
627  {
628  /*
629  * This code erroneously assumes '\.' on a line alone
630  * inside a quoted CSV string terminates the \copy.
631  * http://www.postgresql.org/message-id/E1TdNVQ-0001ju-GO@wrigleys.postgresql.org
632  */
633  if (strcmp(buf, "\\.\n") == 0 ||
634  strcmp(buf, "\\.\r\n") == 0)
635  {
636  copydone = true;
637  break;
638  }
639 
640  firstload = false;
641  }
642 
643  if (PQputCopyData(conn, buf, linelen) <= 0)
644  {
645  OK = false;
646  copydone = true;
647  break;
648  }
649  }
650 
651  if (copystream == pset.cur_cmd_source)
652  {
653  pset.lineno++;
654  pset.stmt_lineno++;
655  }
656  }
657  }
658 
659  /* Check for read error */
660  if (ferror(copystream))
661  OK = false;
662 
663  /*
664  * Terminate data transfer. We can't send an error message if we're using
665  * protocol version 2.
666  */
667  if (PQputCopyEnd(conn,
668  (OK || PQprotocolVersion(conn) < 3) ? NULL :
669  _("aborted because of read failure")) <= 0)
670  OK = false;
671 
672 copyin_cleanup:
673 
674  /*
675  * Clear the EOF flag on the stream, in case copying ended due to an EOF
676  * signal. This allows an interactive TTY session to perform another COPY
677  * FROM STDIN later. (In non-STDIN cases, we're about to close the file
678  * anyway, so it doesn't matter.) Although we don't ever test the flag
679  * with feof(), some fread() implementations won't read more data if it's
680  * set. This also clears the error flag, but we already checked that.
681  */
682  clearerr(copystream);
683 
684  /*
685  * Check command status and return to normal libpq state.
686  *
687  * We do not want to return with the status still PGRES_COPY_IN: our
688  * caller would be unable to distinguish that situation from reaching the
689  * next COPY in a command string that happened to contain two consecutive
690  * COPY FROM STDIN commands. We keep trying PQputCopyEnd() in the hope
691  * it'll work eventually. (What's actually likely to happen is that in
692  * attempting to flush the data, libpq will eventually realize that the
693  * connection is lost. But that's fine; it will get us out of COPY_IN
694  * state, which is what we need.)
695  */
696  while (*res = PQgetResult(conn), PQresultStatus(*res) == PGRES_COPY_IN)
697  {
698  OK = false;
699  PQclear(*res);
700  /* We can't send an error message if we're using protocol version 2 */
701  PQputCopyEnd(conn,
702  (PQprotocolVersion(conn) < 3) ? NULL :
703  _("trying to exit copy mode"));
704  }
705  if (PQresultStatus(*res) != PGRES_COMMAND_OK)
706  {
707  pg_log_info("%s", PQerrorMessage(conn));
708  OK = false;
709  }
710 
711  return OK;
712 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2317
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
volatile bool sigint_interrupt_enabled
Definition: common.c:245
PsqlSettings pset
Definition: startup.c:31
char * get_prompt(promptStatus_t status, ConditionalStack cstack)
Definition: prompt.c:70
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2384
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
FILE * cur_cmd_source
Definition: settings.h:104
sigjmp_buf sigint_interrupt_jmp
Definition: common.c:247
#define COPYBUFSIZ
Definition: copy.c:510
static char * buf
Definition: pg_test_fsync.c:67
void PQclear(PGresult *res)
Definition: fe-exec.c:694
uint64 lineno
Definition: settings.h:110
int PQprotocolVersion(const PGconn *conn)
Definition: fe-connect.c:6603
#define _(x)
Definition: elog.c:87
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
uint64 stmt_lineno
Definition: settings.h:111
#define pg_log_info(...)
Definition: logging.h:87

◆ handleCopyOut()

bool handleCopyOut ( PGconn conn,
FILE *  copystream,
PGresult **  res 
)

Definition at line 436 of file copy.c.

References buf, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQgetResult(), and PQresultStatus().

Referenced by ProcessResult().

437 {
438  bool OK = true;
439  char *buf;
440  int ret;
441 
442  for (;;)
443  {
444  ret = PQgetCopyData(conn, &buf, 0);
445 
446  if (ret < 0)
447  break; /* done or server/connection error */
448 
449  if (buf)
450  {
451  if (OK && copystream && fwrite(buf, 1, ret, copystream) != ret)
452  {
453  pg_log_error("could not write COPY data: %m");
454  /* complain only once, keep reading data from server */
455  OK = false;
456  }
457  PQfreemem(buf);
458  }
459  }
460 
461  if (OK && copystream && fflush(copystream))
462  {
463  pg_log_error("could not write COPY data: %m");
464  OK = false;
465  }
466 
467  if (ret == -2)
468  {
469  pg_log_error("COPY data transfer failed: %s", PQerrorMessage(conn));
470  OK = false;
471  }
472 
473  /*
474  * Check command status and return to normal libpq state.
475  *
476  * If for some reason libpq is still reporting PGRES_COPY_OUT state, we
477  * would like to forcibly exit that state, since our caller would be
478  * unable to distinguish that situation from reaching the next COPY in a
479  * command string that happened to contain two consecutive COPY TO STDOUT
480  * commands. However, libpq provides no API for doing that, and in
481  * principle it's a libpq bug anyway if PQgetCopyData() returns -1 or -2
482  * but hasn't exited COPY_OUT state internally. So we ignore the
483  * possibility here.
484  */
485  *res = PQgetResult(conn);
486  if (PQresultStatus(*res) != PGRES_COMMAND_OK)
487  {
488  pg_log_info("%s", PQerrorMessage(conn));
489  OK = false;
490  }
491 
492  return OK;
493 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6623
#define pg_log_error(...)
Definition: logging.h:79
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Definition: fe-exec.c:2473
static char * buf
Definition: pg_test_fsync.c:67
void PQfreemem(void *ptr)
Definition: fe-exec.c:3296
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define pg_log_info(...)
Definition: logging.h:87