PostgreSQL Source Code  git master
copyto.c File Reference
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/progress.h"
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/partcache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
Include dependency graph for copyto.c:

Go to the source code of this file.

Data Structures

struct  CopyToStateData
 
struct  DR_copy
 

Macros

#define DUMPSOFAR()
 

Typedefs

typedef enum CopyDest CopyDest
 
typedef struct CopyToStateData CopyToStateData
 

Enumerations

enum  CopyDest { COPY_FILE , COPY_FRONTEND , COPY_CALLBACK }
 

Functions

static void EndCopy (CopyToState cstate)
 
static void ClosePipeToProgram (CopyToState cstate)
 
static void CopyOneRowTo (CopyToState cstate, TupleTableSlot *slot)
 
static void CopyAttributeOutText (CopyToState cstate, const char *string)
 
static void CopyAttributeOutCSV (CopyToState cstate, const char *string, bool use_quote, bool single_attr)
 
static void SendCopyBegin (CopyToState cstate)
 
static void SendCopyEnd (CopyToState cstate)
 
static void CopySendData (CopyToState cstate, const void *databuf, int datasize)
 
static void CopySendString (CopyToState cstate, const char *str)
 
static void CopySendChar (CopyToState cstate, char c)
 
static void CopySendEndOfRow (CopyToState cstate)
 
static void CopySendInt32 (CopyToState cstate, int32 val)
 
static void CopySendInt16 (CopyToState cstate, int16 val)
 
CopyToState BeginCopyTo (ParseState *pstate, Relation rel, RawStmt *raw_query, Oid queryRelId, const char *filename, bool is_program, copy_data_dest_cb data_dest_cb, List *attnamelist, List *options)
 
void EndCopyTo (CopyToState cstate)
 
uint64 DoCopyTo (CopyToState cstate)
 
static void copy_dest_startup (DestReceiver *self, int operation, TupleDesc typeinfo)
 
static bool copy_dest_receive (TupleTableSlot *slot, DestReceiver *self)
 
static void copy_dest_shutdown (DestReceiver *self)
 
static void copy_dest_destroy (DestReceiver *self)
 
DestReceiverCreateCopyDestReceiver (void)
 

Variables

static const char BinarySignature [11] = "PGCOPY\n\377\r\n\0"
 

Macro Definition Documentation

◆ DUMPSOFAR

#define DUMPSOFAR ( )
Value:
do { \
if (ptr > start) \
CopySendData(cstate, start, ptr - start); \
} while (0)

Definition at line 984 of file copyto.c.

Typedef Documentation

◆ CopyDest

typedef enum CopyDest CopyDest

◆ CopyToStateData

Enumeration Type Documentation

◆ CopyDest

enum CopyDest
Enumerator
COPY_FILE 
COPY_FRONTEND 
COPY_CALLBACK 

Definition at line 50 of file copyto.c.

51 {
52  COPY_FILE, /* to file (or a piped program) */
53  COPY_FRONTEND, /* to frontend */
54  COPY_CALLBACK /* to callback function */
55 } CopyDest;
CopyDest
Definition: copyto.c:51
@ COPY_FILE
Definition: copyto.c:52
@ COPY_CALLBACK
Definition: copyto.c:54
@ COPY_FRONTEND
Definition: copyto.c:53

Function Documentation

◆ BeginCopyTo()

CopyToState BeginCopyTo ( ParseState pstate,
Relation  rel,
RawStmt raw_query,
Oid  queryRelId,
const char *  filename,
bool  is_program,
copy_data_dest_cb  data_dest_cb,
List attnamelist,
List options 
)

Definition at line 357 of file copyto.c.

366 {
367  CopyToState cstate;
368  bool pipe = (filename == NULL && data_dest_cb == NULL);
369  TupleDesc tupDesc;
370  int num_phys_attrs;
371  MemoryContext oldcontext;
372  const int progress_cols[] = {
375  };
376  int64 progress_vals[] = {
378  0
379  };
380 
381  if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
382  {
383  if (rel->rd_rel->relkind == RELKIND_VIEW)
384  ereport(ERROR,
385  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
386  errmsg("cannot copy from view \"%s\"",
388  errhint("Try the COPY (SELECT ...) TO variant.")));
389  else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
390  ereport(ERROR,
391  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
392  errmsg("cannot copy from materialized view \"%s\"",
394  errhint("Try the COPY (SELECT ...) TO variant.")));
395  else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
396  ereport(ERROR,
397  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
398  errmsg("cannot copy from foreign table \"%s\"",
400  errhint("Try the COPY (SELECT ...) TO variant.")));
401  else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
402  ereport(ERROR,
403  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
404  errmsg("cannot copy from sequence \"%s\"",
405  RelationGetRelationName(rel))));
406  else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
407  ereport(ERROR,
408  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
409  errmsg("cannot copy from partitioned table \"%s\"",
411  errhint("Try the COPY (SELECT ...) TO variant.")));
412  else
413  ereport(ERROR,
414  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
415  errmsg("cannot copy from non-table relation \"%s\"",
416  RelationGetRelationName(rel))));
417  }
418 
419 
420  /* Allocate workspace and zero all fields */
421  cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
422 
423  /*
424  * We allocate everything used by a cstate in a new memory context. This
425  * avoids memory leaks during repeated use of COPY in a query.
426  */
428  "COPY",
430 
431  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
432 
433  /* Extract options from the statement node tree */
434  ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
435 
436  /* Process the source/target relation or query */
437  if (rel)
438  {
439  Assert(!raw_query);
440 
441  cstate->rel = rel;
442 
443  tupDesc = RelationGetDescr(cstate->rel);
444  }
445  else
446  {
447  List *rewritten;
448  Query *query;
449  PlannedStmt *plan;
451 
452  cstate->rel = NULL;
453 
454  /*
455  * Run parse analysis and rewrite. Note this also acquires sufficient
456  * locks on the source table(s).
457  */
458  rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
459  pstate->p_sourcetext, NULL, 0,
460  NULL);
461 
462  /* check that we got back something we can work with */
463  if (rewritten == NIL)
464  {
465  ereport(ERROR,
466  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
467  errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
468  }
469  else if (list_length(rewritten) > 1)
470  {
471  ListCell *lc;
472 
473  /* examine queries to determine which error message to issue */
474  foreach(lc, rewritten)
475  {
476  Query *q = lfirst_node(Query, lc);
477 
479  ereport(ERROR,
480  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481  errmsg("conditional DO INSTEAD rules are not supported for COPY")));
483  ereport(ERROR,
484  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
485  errmsg("DO ALSO rules are not supported for the COPY")));
486  }
487 
488  ereport(ERROR,
489  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
490  errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
491  }
492 
493  query = linitial_node(Query, rewritten);
494 
495  /* The grammar allows SELECT INTO, but we don't support that */
496  if (query->utilityStmt != NULL &&
498  ereport(ERROR,
499  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
500  errmsg("COPY (SELECT INTO) is not supported")));
501 
502  Assert(query->utilityStmt == NULL);
503 
504  /*
505  * Similarly the grammar doesn't enforce the presence of a RETURNING
506  * clause, but this is required here.
507  */
508  if (query->commandType != CMD_SELECT &&
509  query->returningList == NIL)
510  {
511  Assert(query->commandType == CMD_INSERT ||
512  query->commandType == CMD_UPDATE ||
513  query->commandType == CMD_DELETE);
514 
515  ereport(ERROR,
516  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
517  errmsg("COPY query must have a RETURNING clause")));
518  }
519 
520  /* plan the query */
521  plan = pg_plan_query(query, pstate->p_sourcetext,
522  CURSOR_OPT_PARALLEL_OK, NULL);
523 
524  /*
525  * With row-level security and a user using "COPY relation TO", we
526  * have to convert the "COPY relation TO" to a query-based COPY (eg:
527  * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
528  * in any RLS clauses.
529  *
530  * When this happens, we are passed in the relid of the originally
531  * found relation (which we have locked). As the planner will look up
532  * the relation again, we double-check here to make sure it found the
533  * same one that we have locked.
534  */
535  if (queryRelId != InvalidOid)
536  {
537  /*
538  * Note that with RLS involved there may be multiple relations,
539  * and while the one we need is almost certainly first, we don't
540  * make any guarantees of that in the planner, so check the whole
541  * list and make sure we find the original relation.
542  */
543  if (!list_member_oid(plan->relationOids, queryRelId))
544  ereport(ERROR,
545  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
546  errmsg("relation referenced by COPY statement has changed")));
547  }
548 
549  /*
550  * Use a snapshot with an updated command ID to ensure this query sees
551  * results of any previously executed queries.
552  */
555 
556  /* Create dest receiver for COPY OUT */
558  ((DR_copy *) dest)->cstate = cstate;
559 
560  /* Create a QueryDesc requesting no output */
561  cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
564  dest, NULL, NULL, 0);
565 
566  /*
567  * Call ExecutorStart to prepare the plan for execution.
568  *
569  * ExecutorStart computes a result tupdesc for us
570  */
571  ExecutorStart(cstate->queryDesc, 0);
572 
573  tupDesc = cstate->queryDesc->tupDesc;
574  }
575 
576  /* Generate or convert list of attributes to process */
577  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
578 
579  num_phys_attrs = tupDesc->natts;
580 
581  /* Convert FORCE_QUOTE name list to per-column flags, check validity */
582  cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
583  if (cstate->opts.force_quote_all)
584  {
585  int i;
586 
587  for (i = 0; i < num_phys_attrs; i++)
588  cstate->opts.force_quote_flags[i] = true;
589  }
590  else if (cstate->opts.force_quote)
591  {
592  List *attnums;
593  ListCell *cur;
594 
595  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
596 
597  foreach(cur, attnums)
598  {
599  int attnum = lfirst_int(cur);
600  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
601 
602  if (!list_member_int(cstate->attnumlist, attnum))
603  ereport(ERROR,
604  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
605  errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
606  NameStr(attr->attname))));
607  cstate->opts.force_quote_flags[attnum - 1] = true;
608  }
609  }
610 
611  /* Use client encoding when ENCODING option is not specified. */
612  if (cstate->opts.file_encoding < 0)
614  else
615  cstate->file_encoding = cstate->opts.file_encoding;
616 
617  /*
618  * Set up encoding conversion info. Even if the file and server encodings
619  * are the same, we must apply pg_any_to_server() to validate data in
620  * multibyte encodings.
621  */
622  cstate->need_transcoding =
623  (cstate->file_encoding != GetDatabaseEncoding() ||
625  /* See Multibyte encoding comment above */
627 
628  cstate->copy_dest = COPY_FILE; /* default */
629 
630  if (data_dest_cb)
631  {
632  progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
633  cstate->copy_dest = COPY_CALLBACK;
634  cstate->data_dest_cb = data_dest_cb;
635  }
636  else if (pipe)
637  {
638  progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
639 
640  Assert(!is_program); /* the grammar does not allow this */
642  cstate->copy_file = stdout;
643  }
644  else
645  {
646  cstate->filename = pstrdup(filename);
647  cstate->is_program = is_program;
648 
649  if (is_program)
650  {
651  progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
652  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
653  if (cstate->copy_file == NULL)
654  ereport(ERROR,
656  errmsg("could not execute command \"%s\": %m",
657  cstate->filename)));
658  }
659  else
660  {
661  mode_t oumask; /* Pre-existing umask value */
662  struct stat st;
663 
664  progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
665 
666  /*
667  * Prevent write to relative path ... too easy to shoot oneself in
668  * the foot by overwriting a database file ...
669  */
671  ereport(ERROR,
672  (errcode(ERRCODE_INVALID_NAME),
673  errmsg("relative path not allowed for COPY to file")));
674 
675  oumask = umask(S_IWGRP | S_IWOTH);
676  PG_TRY();
677  {
678  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
679  }
680  PG_FINALLY();
681  {
682  umask(oumask);
683  }
684  PG_END_TRY();
685  if (cstate->copy_file == NULL)
686  {
687  /* copy errno because ereport subfunctions might change it */
688  int save_errno = errno;
689 
690  ereport(ERROR,
692  errmsg("could not open file \"%s\" for writing: %m",
693  cstate->filename),
694  (save_errno == ENOENT || save_errno == EACCES) ?
695  errhint("COPY TO instructs the PostgreSQL server process to write a file. "
696  "You may want a client-side facility such as psql's \\copy.") : 0));
697  }
698 
699  if (fstat(fileno(cstate->copy_file), &st))
700  ereport(ERROR,
702  errmsg("could not stat file \"%s\": %m",
703  cstate->filename)));
704 
705  if (S_ISDIR(st.st_mode))
706  ereport(ERROR,
707  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
708  errmsg("\"%s\" is a directory", cstate->filename)));
709  }
710  }
711 
712  /* initialize progress */
714  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
715  pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
716 
717  cstate->bytes_processed = 0;
718 
719  MemoryContextSwitchTo(oldcontext);
720 
721  return cstate;
722 }
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:721
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:404
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
@ PROGRESS_COMMAND_COPY
#define NameStr(name)
Definition: c.h:682
#define PG_BINARY_W
Definition: c.h:1212
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemote
Definition: dest.h:91
@ DestCopyOut
Definition: dest.h:97
struct cursor * cur
Definition: ecpg.c:28
int errcode_for_file_access(void)
Definition: elog.c:718
int errhint(const char *fmt,...)
Definition: elog.c:1153
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define PG_TRY(...)
Definition: elog.h:309
#define PG_END_TRY(...)
Definition: elog.h:334
#define ERROR
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:326
#define ereport(elevel,...)
Definition: elog.h:145
void ExecutorStart(QueryDesc *queryDesc, int eflags)
Definition: execMain.c:131
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2383
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2486
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
bool list_member_int(const List *list, int datum)
Definition: list.c:701
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
int GetDatabaseEncoding(void)
Definition: mbutils.c:1210
int pg_database_encoding_max_length(void)
Definition: mbutils.c:1495
int pg_get_client_encoding(void)
Definition: mbutils.c:336
char * pstrdup(const char *in)
Definition: mcxt.c:1483
void * palloc0(Size size)
Definition: mcxt.c:1230
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:162
@ CMD_INSERT
Definition: nodes.h:261
@ CMD_DELETE
Definition: nodes.h:262
@ CMD_UPDATE
Definition: nodes.h:260
@ CMD_SELECT
Definition: nodes.h:259
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
@ QSRC_NON_INSTEAD_RULE
Definition: parsenodes.h:47
@ QSRC_QUAL_INSTEAD_RULE
Definition: parsenodes.h:46
#define CURSOR_OPT_PARALLEL_OK
Definition: parsenodes.h:2906
int16 attnum
Definition: pg_attribute.h:83
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static char * filename
Definition: pg_dumpall.c:119
#define lfirst_node(type, lc)
Definition: pg_list.h:174
static int list_length(const List *l)
Definition: pg_list.h:150
#define linitial_node(type, l)
Definition: pg_list.h:179
#define NIL
Definition: pg_list.h:66
#define lfirst_int(lc)
Definition: pg_list.h:171
#define PG_ENCODING_IS_CLIENT_ONLY(_enc)
Definition: pg_wchar.h:284
#define is_absolute_path(filename)
Definition: port.h:103
CommandDest whereToSendOutput
Definition: postgres.c:85
List * pg_analyze_and_rewrite_fixedparams(RawStmt *parsetree, const char *query_string, const Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)
Definition: postgres.c:638
PlannedStmt * pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, ParamListInfo boundParams)
Definition: postgres.c:853
#define InvalidOid
Definition: postgres_ext.h:36
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
Definition: pquery.c:67
#define PROGRESS_COPY_COMMAND
Definition: progress.h:141
#define PROGRESS_COPY_TYPE_FILE
Definition: progress.h:149
#define PROGRESS_COPY_COMMAND_TO
Definition: progress.h:146
#define PROGRESS_COPY_TYPE
Definition: progress.h:142
#define PROGRESS_COPY_TYPE_PROGRAM
Definition: progress.h:150
#define PROGRESS_COPY_TYPE_CALLBACK
Definition: progress.h:152
#define PROGRESS_COPY_TYPE_PIPE
Definition: progress.h:151
#define RelationGetRelid(relation)
Definition: rel.h:501
#define RelationGetDescr(relation)
Definition: rel.h:527
#define RelationGetRelationName(relation)
Definition: rel.h:535
void UpdateActiveSnapshotCommandId(void)
Definition: snapmgr.c:747
void PushCopiedSnapshot(Snapshot snapshot)
Definition: snapmgr.c:735
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:805
#define InvalidSnapshot
Definition: snapshot.h:123
bool force_quote_all
Definition: copy.h:54
List * force_quote
Definition: copy.h:53
bool * force_quote_flags
Definition: copy.h:55
int file_encoding
Definition: copy.h:41
MemoryContext copycontext
Definition: copyto.c:97
Relation rel
Definition: copyto.c:84
copy_data_dest_cb data_dest_cb
Definition: copyto.c:89
bool encoding_embeds_ascii
Definition: copyto.c:81
CopyDest copy_dest
Definition: copyto.c:75
bool need_transcoding
Definition: copyto.c:80
bool is_program
Definition: copyto.c:88
FILE * copy_file
Definition: copyto.c:76
int file_encoding
Definition: copyto.c:79
CopyFormatOptions opts
Definition: copyto.c:91
uint64 bytes_processed
Definition: copyto.c:101
char * filename
Definition: copyto.c:87
List * attnumlist
Definition: copyto.c:86
QueryDesc * queryDesc
Definition: copyto.c:85
Definition: pg_list.h:52
const char * p_sourcetext
Definition: parse_node.h:182
List * relationOids
Definition: plannodes.h:87
TupleDesc tupDesc
Definition: execdesc.h:47
List * returningList
Definition: parsenodes.h:166
QuerySource querySource
Definition: parsenodes.h:124
CmdType commandType
Definition: parsenodes.h:122
Node * utilityStmt
Definition: parsenodes.h:134
Form_pg_class rd_rel
Definition: rel.h:110
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define S_IWOTH
Definition: win32_port.h:318
#define S_ISDIR(m)
Definition: win32_port.h:327
#define fstat
Definition: win32_port.h:285
#define S_IWGRP
Definition: win32_port.h:306

References AllocateFile(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), attnum, CopyToStateData::attnumlist, CopyToStateData::bytes_processed, CMD_DELETE, CMD_INSERT, CMD_SELECT, CMD_UPDATE, Query::commandType, COPY_CALLBACK, CopyToStateData::copy_dest, COPY_FILE, CopyToStateData::copy_file, CopyToStateData::copycontext, CopyGetAttnums(), CreateDestReceiver(), CreateQueryDesc(), cur, CurrentMemoryContext, CURSOR_OPT_PARALLEL_OK, CopyToStateData::data_dest_cb, generate_unaccent_rules::dest, DestCopyOut, DestRemote, CopyToStateData::encoding_embeds_ascii, ereport, errcode(), errcode_for_file_access(), errhint(), errmsg(), ERROR, ExecutorStart(), CopyToStateData::file_encoding, CopyFormatOptions::file_encoding, CopyToStateData::filename, filename, CopyFormatOptions::force_quote, CopyFormatOptions::force_quote_all, CopyFormatOptions::force_quote_flags, fstat, GetActiveSnapshot(), GetDatabaseEncoding(), i, InvalidOid, InvalidSnapshot, is_absolute_path, CopyToStateData::is_program, IsA, lfirst_int, lfirst_node, linitial_node, list_length(), list_member_int(), list_member_oid(), MemoryContextSwitchTo(), NameStr, TupleDescData::natts, CopyToStateData::need_transcoding, NIL, OpenPipeStream(), CopyToStateData::opts, ParseState::p_sourcetext, palloc0(), pg_analyze_and_rewrite_fixedparams(), PG_BINARY_W, pg_database_encoding_max_length(), PG_ENCODING_IS_CLIENT_ONLY, PG_END_TRY, PG_FINALLY, pg_get_client_encoding(), pg_plan_query(), PG_TRY, pgstat_progress_start_command(), pgstat_progress_update_multi_param(), ProcessCopyOptions(), PROGRESS_COMMAND_COPY, PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_TO, PROGRESS_COPY_TYPE, PROGRESS_COPY_TYPE_CALLBACK, PROGRESS_COPY_TYPE_FILE, PROGRESS_COPY_TYPE_PIPE, PROGRESS_COPY_TYPE_PROGRAM, pstrdup(), PushCopiedSnapshot(), QSRC_NON_INSTEAD_RULE, QSRC_QUAL_INSTEAD_RULE, CopyToStateData::queryDesc, Query::querySource, RelationData::rd_rel, CopyToStateData::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, PlannedStmt::relationOids, Query::returningList, S_ISDIR, S_IWGRP, S_IWOTH, stat::st_mode, generate_unaccent_rules::stdout, QueryDesc::tupDesc, TupleDescAttr, UpdateActiveSnapshotCommandId(), Query::utilityStmt, and whereToSendOutput.

Referenced by DoCopy(), and test_copy_to_callback().

◆ ClosePipeToProgram()

static void ClosePipeToProgram ( CopyToState  cstate)
static

Definition at line 296 of file copyto.c.

297 {
298  int pclose_rc;
299 
300  Assert(cstate->is_program);
301 
302  pclose_rc = ClosePipeStream(cstate->copy_file);
303  if (pclose_rc == -1)
304  ereport(ERROR,
306  errmsg("could not close pipe to external command: %m")));
307  else if (pclose_rc != 0)
308  {
309  ereport(ERROR,
310  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
311  errmsg("program \"%s\" failed",
312  cstate->filename),
313  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
314  }
315 }
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1066
int ClosePipeStream(FILE *file)
Definition: fd.c:2791
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33

References Assert(), ClosePipeStream(), CopyToStateData::copy_file, ereport, errcode(), errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, CopyToStateData::filename, CopyToStateData::is_program, and wait_result_to_str().

Referenced by CopySendEndOfRow(), and EndCopy().

◆ copy_dest_destroy()

static void copy_dest_destroy ( DestReceiver self)
static

Definition at line 1266 of file copyto.c.

1267 {
1268  pfree(self);
1269 }
void pfree(void *pointer)
Definition: mcxt.c:1306

References pfree().

Referenced by CreateCopyDestReceiver().

◆ copy_dest_receive()

static bool copy_dest_receive ( TupleTableSlot slot,
DestReceiver self 
)
static

Definition at line 1238 of file copyto.c.

1239 {
1240  DR_copy *myState = (DR_copy *) self;
1241  CopyToState cstate = myState->cstate;
1242 
1243  /* Send the data */
1244  CopyOneRowTo(cstate, slot);
1245 
1246  /* Increment the number of processed tuples, and report the progress */
1248  ++myState->processed);
1249 
1250  return true;
1251 }
void pgstat_progress_update_param(int index, int64 val)
static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
Definition: copyto.c:910
#define PROGRESS_COPY_TUPLES_PROCESSED
Definition: progress.h:139
CopyToState cstate
Definition: copyto.c:108
uint64 processed
Definition: copyto.c:109

References CopyOneRowTo(), DR_copy::cstate, pgstat_progress_update_param(), DR_copy::processed, and PROGRESS_COPY_TUPLES_PROCESSED.

Referenced by CreateCopyDestReceiver().

◆ copy_dest_shutdown()

static void copy_dest_shutdown ( DestReceiver self)
static

Definition at line 1257 of file copyto.c.

1258 {
1259  /* no-op */
1260 }

Referenced by CreateCopyDestReceiver().

◆ copy_dest_startup()

static void copy_dest_startup ( DestReceiver self,
int  operation,
TupleDesc  typeinfo 
)
static

Definition at line 1229 of file copyto.c.

1230 {
1231  /* no-op */
1232 }

Referenced by CreateCopyDestReceiver().

◆ CopyAttributeOutCSV()

static void CopyAttributeOutCSV ( CopyToState  cstate,
const char *  string,
bool  use_quote,
bool  single_attr 
)
static

Definition at line 1144 of file copyto.c.

1146 {
1147  const char *ptr;
1148  const char *start;
1149  char c;
1150  char delimc = cstate->opts.delim[0];
1151  char quotec = cstate->opts.quote[0];
1152  char escapec = cstate->opts.escape[0];
1153 
1154  /* force quoting if it matches null_print (before conversion!) */
1155  if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1156  use_quote = true;
1157 
1158  if (cstate->need_transcoding)
1159  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1160  else
1161  ptr = string;
1162 
1163  /*
1164  * Make a preliminary pass to discover if it needs quoting
1165  */
1166  if (!use_quote)
1167  {
1168  /*
1169  * Because '\.' can be a data value, quote it if it appears alone on a
1170  * line so it is not interpreted as the end-of-data marker.
1171  */
1172  if (single_attr && strcmp(ptr, "\\.") == 0)
1173  use_quote = true;
1174  else
1175  {
1176  const char *tptr = ptr;
1177 
1178  while ((c = *tptr) != '\0')
1179  {
1180  if (c == delimc || c == quotec || c == '\n' || c == '\r')
1181  {
1182  use_quote = true;
1183  break;
1184  }
1185  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1186  tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1187  else
1188  tptr++;
1189  }
1190  }
1191  }
1192 
1193  if (use_quote)
1194  {
1195  CopySendChar(cstate, quotec);
1196 
1197  /*
1198  * We adopt the same optimization strategy as in CopyAttributeOutText
1199  */
1200  start = ptr;
1201  while ((c = *ptr) != '\0')
1202  {
1203  if (c == quotec || c == escapec)
1204  {
1205  DUMPSOFAR();
1206  CopySendChar(cstate, escapec);
1207  start = ptr; /* we include char in next run */
1208  }
1209  if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1210  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1211  else
1212  ptr++;
1213  }
1214  DUMPSOFAR();
1215 
1216  CopySendChar(cstate, quotec);
1217  }
1218  else
1219  {
1220  /* If it doesn't need quoting, we can just dump it as-is */
1221  CopySendString(cstate, ptr);
1222  }
1223 }
#define IS_HIGHBIT_SET(ch)
Definition: c.h:1094
#define DUMPSOFAR()
Definition: copyto.c:984
static void CopySendChar(CopyToState cstate, char c)
Definition: copyto.c:188
static void CopySendString(CopyToState cstate, const char *str)
Definition: copyto.c:182
char * pg_server_to_any(const char *s, int len, int encoding)
Definition: mbutils.c:749
char * c
char string[11]
Definition: preproc-type.c:52
char * quote
Definition: copy.h:51
char * escape
Definition: copy.h:52
char * null_print
Definition: copy.h:47
char * delim
Definition: copy.h:50
int pg_encoding_mblen(int encoding, const char *mbstr)
Definition: wchar.c:2130

References CopySendChar(), CopySendString(), CopyFormatOptions::delim, DUMPSOFAR, CopyToStateData::encoding_embeds_ascii, CopyFormatOptions::escape, CopyToStateData::file_encoding, IS_HIGHBIT_SET, CopyToStateData::need_transcoding, CopyFormatOptions::null_print, CopyToStateData::opts, pg_encoding_mblen(), pg_server_to_any(), and CopyFormatOptions::quote.

Referenced by CopyOneRowTo(), and DoCopyTo().

◆ CopyAttributeOutText()

static void CopyAttributeOutText ( CopyToState  cstate,
const char *  string 
)
static

Definition at line 991 of file copyto.c.

992 {
993  const char *ptr;
994  const char *start;
995  char c;
996  char delimc = cstate->opts.delim[0];
997 
998  if (cstate->need_transcoding)
999  ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1000  else
1001  ptr = string;
1002 
1003  /*
1004  * We have to grovel through the string searching for control characters
1005  * and instances of the delimiter character. In most cases, though, these
1006  * are infrequent. To avoid overhead from calling CopySendData once per
1007  * character, we dump out all characters between escaped characters in a
1008  * single call. The loop invariant is that the data from "start" to "ptr"
1009  * can be sent literally, but hasn't yet been.
1010  *
1011  * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1012  * in valid backend encodings, extra bytes of a multibyte character never
1013  * look like ASCII. This loop is sufficiently performance-critical that
1014  * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1015  * of the normal safe-encoding path.
1016  */
1017  if (cstate->encoding_embeds_ascii)
1018  {
1019  start = ptr;
1020  while ((c = *ptr) != '\0')
1021  {
1022  if ((unsigned char) c < (unsigned char) 0x20)
1023  {
1024  /*
1025  * \r and \n must be escaped, the others are traditional. We
1026  * prefer to dump these using the C-like notation, rather than
1027  * a backslash and the literal character, because it makes the
1028  * dump file a bit more proof against Microsoftish data
1029  * mangling.
1030  */
1031  switch (c)
1032  {
1033  case '\b':
1034  c = 'b';
1035  break;
1036  case '\f':
1037  c = 'f';
1038  break;
1039  case '\n':
1040  c = 'n';
1041  break;
1042  case '\r':
1043  c = 'r';
1044  break;
1045  case '\t':
1046  c = 't';
1047  break;
1048  case '\v':
1049  c = 'v';
1050  break;
1051  default:
1052  /* If it's the delimiter, must backslash it */
1053  if (c == delimc)
1054  break;
1055  /* All ASCII control chars are length 1 */
1056  ptr++;
1057  continue; /* fall to end of loop */
1058  }
1059  /* if we get here, we need to convert the control char */
1060  DUMPSOFAR();
1061  CopySendChar(cstate, '\\');
1062  CopySendChar(cstate, c);
1063  start = ++ptr; /* do not include char in next run */
1064  }
1065  else if (c == '\\' || c == delimc)
1066  {
1067  DUMPSOFAR();
1068  CopySendChar(cstate, '\\');
1069  start = ptr++; /* we include char in next run */
1070  }
1071  else if (IS_HIGHBIT_SET(c))
1072  ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1073  else
1074  ptr++;
1075  }
1076  }
1077  else
1078  {
1079  start = ptr;
1080  while ((c = *ptr) != '\0')
1081  {
1082  if ((unsigned char) c < (unsigned char) 0x20)
1083  {
1084  /*
1085  * \r and \n must be escaped, the others are traditional. We
1086  * prefer to dump these using the C-like notation, rather than
1087  * a backslash and the literal character, because it makes the
1088  * dump file a bit more proof against Microsoftish data
1089  * mangling.
1090  */
1091  switch (c)
1092  {
1093  case '\b':
1094  c = 'b';
1095  break;
1096  case '\f':
1097  c = 'f';
1098  break;
1099  case '\n':
1100  c = 'n';
1101  break;
1102  case '\r':
1103  c = 'r';
1104  break;
1105  case '\t':
1106  c = 't';
1107  break;
1108  case '\v':
1109  c = 'v';
1110  break;
1111  default:
1112  /* If it's the delimiter, must backslash it */
1113  if (c == delimc)
1114  break;
1115  /* All ASCII control chars are length 1 */
1116  ptr++;
1117  continue; /* fall to end of loop */
1118  }
1119  /* if we get here, we need to convert the control char */
1120  DUMPSOFAR();
1121  CopySendChar(cstate, '\\');
1122  CopySendChar(cstate, c);
1123  start = ++ptr; /* do not include char in next run */
1124  }
1125  else if (c == '\\' || c == delimc)
1126  {
1127  DUMPSOFAR();
1128  CopySendChar(cstate, '\\');
1129  start = ptr++; /* we include char in next run */
1130  }
1131  else
1132  ptr++;
1133  }
1134  }
1135 
1136  DUMPSOFAR();
1137 }

References CopySendChar(), CopyFormatOptions::delim, DUMPSOFAR, CopyToStateData::encoding_embeds_ascii, CopyToStateData::file_encoding, IS_HIGHBIT_SET, CopyToStateData::need_transcoding, CopyToStateData::opts, pg_encoding_mblen(), and pg_server_to_any().

Referenced by CopyOneRowTo(), and DoCopyTo().

◆ CopyOneRowTo()

static void CopyOneRowTo ( CopyToState  cstate,
TupleTableSlot slot 
)
static

Definition at line 910 of file copyto.c.

911 {
912  bool need_delim = false;
913  FmgrInfo *out_functions = cstate->out_functions;
914  MemoryContext oldcontext;
915  ListCell *cur;
916  char *string;
917 
919  oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
920 
921  if (cstate->opts.binary)
922  {
923  /* Binary per-tuple header */
924  CopySendInt16(cstate, list_length(cstate->attnumlist));
925  }
926 
927  /* Make sure the tuple is fully deconstructed */
928  slot_getallattrs(slot);
929 
930  foreach(cur, cstate->attnumlist)
931  {
932  int attnum = lfirst_int(cur);
933  Datum value = slot->tts_values[attnum - 1];
934  bool isnull = slot->tts_isnull[attnum - 1];
935 
936  if (!cstate->opts.binary)
937  {
938  if (need_delim)
939  CopySendChar(cstate, cstate->opts.delim[0]);
940  need_delim = true;
941  }
942 
943  if (isnull)
944  {
945  if (!cstate->opts.binary)
946  CopySendString(cstate, cstate->opts.null_print_client);
947  else
948  CopySendInt32(cstate, -1);
949  }
950  else
951  {
952  if (!cstate->opts.binary)
953  {
954  string = OutputFunctionCall(&out_functions[attnum - 1],
955  value);
956  if (cstate->opts.csv_mode)
957  CopyAttributeOutCSV(cstate, string,
958  cstate->opts.force_quote_flags[attnum - 1],
959  list_length(cstate->attnumlist) == 1);
960  else
961  CopyAttributeOutText(cstate, string);
962  }
963  else
964  {
965  bytea *outputbytes;
966 
967  outputbytes = SendFunctionCall(&out_functions[attnum - 1],
968  value);
969  CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
970  CopySendData(cstate, VARDATA(outputbytes),
971  VARSIZE(outputbytes) - VARHDRSZ);
972  }
973  }
974  }
975 
976  CopySendEndOfRow(cstate);
977 
978  MemoryContextSwitchTo(oldcontext);
979 }
#define VARHDRSZ
Definition: c.h:628
static void CopySendInt32(CopyToState cstate, int32 val)
Definition: copyto.c:272
static void CopyAttributeOutText(CopyToState cstate, const char *string)
Definition: copyto.c:991
static void CopySendInt16(CopyToState cstate, int16 val)
Definition: copyto.c:284
static void CopySendData(CopyToState cstate, const void *databuf, int datasize)
Definition: copyto.c:176
static void CopyAttributeOutCSV(CopyToState cstate, const char *string, bool use_quote, bool single_attr)
Definition: copyto.c:1144
static void CopySendEndOfRow(CopyToState cstate)
Definition: copyto.c:194
bytea * SendFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1620
char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)
Definition: fmgr.c:1559
static struct @143 value
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:303
uintptr_t Datum
Definition: postgres.h:412
#define VARDATA(PTR)
Definition: postgres.h:316
#define VARSIZE(PTR)
Definition: postgres.h:317
bool binary
Definition: copy.h:43
char * null_print_client
Definition: copy.h:49
bool csv_mode
Definition: copy.h:45
FmgrInfo * out_functions
Definition: copyto.c:99
MemoryContext rowcontext
Definition: copyto.c:100
Definition: fmgr.h:57
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
Definition: c.h:623
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:362

References attnum, CopyToStateData::attnumlist, CopyFormatOptions::binary, CopyAttributeOutCSV(), CopyAttributeOutText(), CopySendChar(), CopySendData(), CopySendEndOfRow(), CopySendInt16(), CopySendInt32(), CopySendString(), CopyFormatOptions::csv_mode, cur, CopyFormatOptions::delim, CopyFormatOptions::force_quote_flags, lfirst_int, list_length(), MemoryContextReset(), MemoryContextSwitchTo(), CopyFormatOptions::null_print_client, CopyToStateData::opts, CopyToStateData::out_functions, OutputFunctionCall(), CopyToStateData::rowcontext, SendFunctionCall(), slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, value, VARDATA, VARHDRSZ, and VARSIZE.

Referenced by copy_dest_receive(), and DoCopyTo().

◆ CopySendChar()

static void CopySendChar ( CopyToState  cstate,
char  c 
)
static

Definition at line 188 of file copyto.c.

189 {
191 }
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:128
StringInfo fe_msgbuf
Definition: copyto.c:77

References appendStringInfoCharMacro, and CopyToStateData::fe_msgbuf.

Referenced by CopyAttributeOutCSV(), CopyAttributeOutText(), CopyOneRowTo(), CopySendEndOfRow(), and DoCopyTo().

◆ CopySendData()

static void CopySendData ( CopyToState  cstate,
const void *  databuf,
int  datasize 
)
static

Definition at line 176 of file copyto.c.

177 {
178  appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
179 }
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:227

References appendBinaryStringInfo(), and CopyToStateData::fe_msgbuf.

Referenced by CopyOneRowTo(), CopySendInt16(), CopySendInt32(), and DoCopyTo().

◆ CopySendEndOfRow()

static void CopySendEndOfRow ( CopyToState  cstate)
static

Definition at line 194 of file copyto.c.

195 {
196  StringInfo fe_msgbuf = cstate->fe_msgbuf;
197 
198  switch (cstate->copy_dest)
199  {
200  case COPY_FILE:
201  if (!cstate->opts.binary)
202  {
203  /* Default line termination depends on platform */
204 #ifndef WIN32
205  CopySendChar(cstate, '\n');
206 #else
207  CopySendString(cstate, "\r\n");
208 #endif
209  }
210 
211  if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
212  cstate->copy_file) != 1 ||
213  ferror(cstate->copy_file))
214  {
215  if (cstate->is_program)
216  {
217  if (errno == EPIPE)
218  {
219  /*
220  * The pipe will be closed automatically on error at
221  * the end of transaction, but we might get a better
222  * error message from the subprocess' exit code than
223  * just "Broken Pipe"
224  */
225  ClosePipeToProgram(cstate);
226 
227  /*
228  * If ClosePipeToProgram() didn't throw an error, the
229  * program terminated normally, but closed the pipe
230  * first. Restore errno, and throw an error.
231  */
232  errno = EPIPE;
233  }
234  ereport(ERROR,
236  errmsg("could not write to COPY program: %m")));
237  }
238  else
239  ereport(ERROR,
241  errmsg("could not write to COPY file: %m")));
242  }
243  break;
244  case COPY_FRONTEND:
245  /* The FE/BE protocol uses \n as newline for all platforms */
246  if (!cstate->opts.binary)
247  CopySendChar(cstate, '\n');
248 
249  /* Dump the accumulated row as one CopyData message */
250  (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
251  break;
252  case COPY_CALLBACK:
253  cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
254  break;
255  }
256 
257  /* Update the progress */
258  cstate->bytes_processed += fe_msgbuf->len;
260 
261  resetStringInfo(fe_msgbuf);
262 }
static void ClosePipeToProgram(CopyToState cstate)
Definition: copyto.c:296
#define pq_putmessage(msgtype, s, len)
Definition: libpq.h:49
#define PROGRESS_COPY_BYTES_PROCESSED
Definition: progress.h:137
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75

References CopyFormatOptions::binary, CopyToStateData::bytes_processed, ClosePipeToProgram(), COPY_CALLBACK, CopyToStateData::copy_dest, COPY_FILE, CopyToStateData::copy_file, COPY_FRONTEND, CopySendChar(), CopySendString(), StringInfoData::data, CopyToStateData::data_dest_cb, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyToStateData::fe_msgbuf, CopyToStateData::is_program, StringInfoData::len, CopyToStateData::opts, pgstat_progress_update_param(), pq_putmessage, PROGRESS_COPY_BYTES_PROCESSED, and resetStringInfo().

Referenced by CopyOneRowTo(), and DoCopyTo().

◆ CopySendInt16()

static void CopySendInt16 ( CopyToState  cstate,
int16  val 
)
inlinestatic

Definition at line 284 of file copyto.c.

285 {
286  uint16 buf;
287 
288  buf = pg_hton16((uint16) val);
289  CopySendData(cstate, &buf, sizeof(buf));
290 }
unsigned short uint16
Definition: c.h:441
long val
Definition: informix.c:664
#define pg_hton16(x)
Definition: pg_bswap.h:120
static char * buf
Definition: pg_test_fsync.c:67

References buf, CopySendData(), pg_hton16, and val.

Referenced by CopyOneRowTo(), and DoCopyTo().

◆ CopySendInt32()

static void CopySendInt32 ( CopyToState  cstate,
int32  val 
)
inlinestatic

Definition at line 272 of file copyto.c.

273 {
274  uint32 buf;
275 
276  buf = pg_hton32((uint32) val);
277  CopySendData(cstate, &buf, sizeof(buf));
278 }
unsigned int uint32
Definition: c.h:442
#define pg_hton32(x)
Definition: pg_bswap.h:121

References buf, CopySendData(), pg_hton32, and val.

Referenced by CopyOneRowTo(), and DoCopyTo().

◆ CopySendString()

static void CopySendString ( CopyToState  cstate,
const char *  str 
)
static

◆ CreateCopyDestReceiver()

DestReceiver* CreateCopyDestReceiver ( void  )

Definition at line 1275 of file copyto.c.

1276 {
1277  DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
1278 
1279  self->pub.receiveSlot = copy_dest_receive;
1280  self->pub.rStartup = copy_dest_startup;
1281  self->pub.rShutdown = copy_dest_shutdown;
1282  self->pub.rDestroy = copy_dest_destroy;
1283  self->pub.mydest = DestCopyOut;
1284 
1285  self->cstate = NULL; /* will be set later */
1286  self->processed = 0;
1287 
1288  return (DestReceiver *) self;
1289 }
static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
Definition: copyto.c:1238
static void copy_dest_destroy(DestReceiver *self)
Definition: copyto.c:1266
static void copy_dest_shutdown(DestReceiver *self)
Definition: copyto.c:1257
static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: copyto.c:1229
void * palloc(Size size)
Definition: mcxt.c:1199

References copy_dest_destroy(), copy_dest_receive(), copy_dest_shutdown(), copy_dest_startup(), DestCopyOut, and palloc().

Referenced by CreateDestReceiver().

◆ DoCopyTo()

uint64 DoCopyTo ( CopyToState  cstate)

Definition at line 749 of file copyto.c.

750 {
751  bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
752  bool fe_copy = (pipe && whereToSendOutput == DestRemote);
753  TupleDesc tupDesc;
754  int num_phys_attrs;
755  ListCell *cur;
756  uint64 processed;
757 
758  if (fe_copy)
759  SendCopyBegin(cstate);
760 
761  if (cstate->rel)
762  tupDesc = RelationGetDescr(cstate->rel);
763  else
764  tupDesc = cstate->queryDesc->tupDesc;
765  num_phys_attrs = tupDesc->natts;
766  cstate->opts.null_print_client = cstate->opts.null_print; /* default */
767 
768  /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
769  cstate->fe_msgbuf = makeStringInfo();
770 
771  /* Get info about the columns we need to process. */
772  cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
773  foreach(cur, cstate->attnumlist)
774  {
775  int attnum = lfirst_int(cur);
776  Oid out_func_oid;
777  bool isvarlena;
778  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
779 
780  if (cstate->opts.binary)
781  getTypeBinaryOutputInfo(attr->atttypid,
782  &out_func_oid,
783  &isvarlena);
784  else
785  getTypeOutputInfo(attr->atttypid,
786  &out_func_oid,
787  &isvarlena);
788  fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
789  }
790 
791  /*
792  * Create a temporary memory context that we can reset once per row to
793  * recover palloc'd memory. This avoids any problems with leaks inside
794  * datatype output routines, and should be faster than retail pfree's
795  * anyway. (We don't need a whole econtext as CopyFrom does.)
796  */
798  "COPY TO",
800 
801  if (cstate->opts.binary)
802  {
803  /* Generate header for a binary copy */
804  int32 tmp;
805 
806  /* Signature */
807  CopySendData(cstate, BinarySignature, 11);
808  /* Flags field */
809  tmp = 0;
810  CopySendInt32(cstate, tmp);
811  /* No header extension */
812  tmp = 0;
813  CopySendInt32(cstate, tmp);
814  }
815  else
816  {
817  /*
818  * For non-binary copy, we need to convert null_print to file
819  * encoding, because it will be sent directly with CopySendString.
820  */
821  if (cstate->need_transcoding)
823  cstate->opts.null_print_len,
824  cstate->file_encoding);
825 
826  /* if a header has been requested send the line */
827  if (cstate->opts.header_line)
828  {
829  bool hdr_delim = false;
830 
831  foreach(cur, cstate->attnumlist)
832  {
833  int attnum = lfirst_int(cur);
834  char *colname;
835 
836  if (hdr_delim)
837  CopySendChar(cstate, cstate->opts.delim[0]);
838  hdr_delim = true;
839 
840  colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
841 
842  if (cstate->opts.csv_mode)
843  CopyAttributeOutCSV(cstate, colname, false,
844  list_length(cstate->attnumlist) == 1);
845  else
846  CopyAttributeOutText(cstate, colname);
847  }
848 
849  CopySendEndOfRow(cstate);
850  }
851  }
852 
853  if (cstate->rel)
854  {
855  TupleTableSlot *slot;
856  TableScanDesc scandesc;
857 
858  scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
859  slot = table_slot_create(cstate->rel, NULL);
860 
861  processed = 0;
862  while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
863  {
865 
866  /* Deconstruct the tuple ... */
867  slot_getallattrs(slot);
868 
869  /* Format and send the data */
870  CopyOneRowTo(cstate, slot);
871 
872  /*
873  * Increment the number of processed tuples, and report the
874  * progress.
875  */
877  ++processed);
878  }
879 
881  table_endscan(scandesc);
882  }
883  else
884  {
885  /* run the plan --- the dest receiver will send tuples */
886  ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
887  processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
888  }
889 
890  if (cstate->opts.binary)
891  {
892  /* Generate trailer for a binary copy */
893  CopySendInt16(cstate, -1);
894  /* Need to flush out the trailer */
895  CopySendEndOfRow(cstate);
896  }
897 
899 
900  if (fe_copy)
901  SendCopyEnd(cstate);
902 
903  return processed;
904 }
signed int int32
Definition: c.h:430
static void SendCopyBegin(CopyToState cstate)
Definition: copyto.c:140
static void SendCopyEnd(CopyToState cstate)
Definition: copyto.c:157
static const char BinarySignature[11]
Definition: copyto.c:113
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once)
Definition: execMain.c:300
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
void getTypeBinaryOutputInfo(Oid type, Oid *typSend, bool *typIsVarlena)
Definition: lsyscache.c:2931
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:2865
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
NameData attname
Definition: pg_attribute.h:41
unsigned int Oid
Definition: postgres_ext.h:31
@ ForwardScanDirection
Definition: sdir.h:26
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
int null_print_len
Definition: copy.h:48
CopyHeaderChoice header_line
Definition: copy.h:46
DestReceiver * dest
Definition: execdesc.h:41
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:885
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:993
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1034

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, attname, attnum, CopyToStateData::attnumlist, CopyFormatOptions::binary, BinarySignature, CHECK_FOR_INTERRUPTS, CopyAttributeOutCSV(), CopyAttributeOutText(), CopyOneRowTo(), CopySendChar(), CopySendData(), CopySendEndOfRow(), CopySendInt16(), CopySendInt32(), CopyFormatOptions::csv_mode, cur, CurrentMemoryContext, CopyToStateData::data_dest_cb, CopyFormatOptions::delim, QueryDesc::dest, DestRemote, ExecDropSingleTupleTableSlot(), ExecutorRun(), CopyToStateData::fe_msgbuf, CopyToStateData::file_encoding, CopyToStateData::filename, fmgr_info(), ForwardScanDirection, GetActiveSnapshot(), getTypeBinaryOutputInfo(), getTypeOutputInfo(), CopyFormatOptions::header_line, lfirst_int, list_length(), makeStringInfo(), MemoryContextDelete(), NameStr, TupleDescData::natts, CopyToStateData::need_transcoding, CopyFormatOptions::null_print, CopyFormatOptions::null_print_client, CopyFormatOptions::null_print_len, CopyToStateData::opts, CopyToStateData::out_functions, palloc(), pg_server_to_any(), pgstat_progress_update_param(), PROGRESS_COPY_TUPLES_PROCESSED, CopyToStateData::queryDesc, CopyToStateData::rel, RelationGetDescr, CopyToStateData::rowcontext, SendCopyBegin(), SendCopyEnd(), slot_getallattrs(), table_beginscan(), table_endscan(), table_scan_getnextslot(), table_slot_create(), QueryDesc::tupDesc, TupleDescAttr, and whereToSendOutput.

Referenced by DoCopy(), and test_copy_to_callback().

◆ EndCopy()

static void EndCopy ( CopyToState  cstate)
static

Definition at line 321 of file copyto.c.

322 {
323  if (cstate->is_program)
324  {
325  ClosePipeToProgram(cstate);
326  }
327  else
328  {
329  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
330  ereport(ERROR,
332  errmsg("could not close file \"%s\": %m",
333  cstate->filename)));
334  }
335 
337 
339  pfree(cstate);
340 }
void pgstat_progress_end_command(void)
int FreeFile(FILE *file)
Definition: fd.c:2581

References ClosePipeToProgram(), CopyToStateData::copy_file, CopyToStateData::copycontext, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyToStateData::filename, FreeFile(), CopyToStateData::is_program, MemoryContextDelete(), pfree(), and pgstat_progress_end_command().

Referenced by EndCopyTo().

◆ EndCopyTo()

void EndCopyTo ( CopyToState  cstate)

Definition at line 728 of file copyto.c.

729 {
730  if (cstate->queryDesc != NULL)
731  {
732  /* Close down the query and free resources. */
733  ExecutorFinish(cstate->queryDesc);
734  ExecutorEnd(cstate->queryDesc);
735  FreeQueryDesc(cstate->queryDesc);
737  }
738 
739  /* Clean up storage */
740  EndCopy(cstate);
741 }
static void EndCopy(CopyToState cstate)
Definition: copyto.c:321
void ExecutorEnd(QueryDesc *queryDesc)
Definition: execMain.c:461
void ExecutorFinish(QueryDesc *queryDesc)
Definition: execMain.c:401
void FreeQueryDesc(QueryDesc *qdesc)
Definition: pquery.c:105
void PopActiveSnapshot(void)
Definition: snapmgr.c:778

References EndCopy(), ExecutorEnd(), ExecutorFinish(), FreeQueryDesc(), PopActiveSnapshot(), and CopyToStateData::queryDesc.

Referenced by DoCopy(), and test_copy_to_callback().

◆ SendCopyBegin()

static void SendCopyBegin ( CopyToState  cstate)
static

Definition at line 140 of file copyto.c.

141 {
143  int natts = list_length(cstate->attnumlist);
144  int16 format = (cstate->opts.binary ? 1 : 0);
145  int i;
146 
147  pq_beginmessage(&buf, 'H');
148  pq_sendbyte(&buf, format); /* overall format */
149  pq_sendint16(&buf, natts);
150  for (i = 0; i < natts; i++)
151  pq_sendint16(&buf, format); /* per-column formats */
152  pq_endmessage(&buf);
153  cstate->copy_dest = COPY_FRONTEND;
154 }
signed short int16
Definition: c.h:429
static char format
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137

References CopyToStateData::attnumlist, CopyFormatOptions::binary, buf, CopyToStateData::copy_dest, COPY_FRONTEND, format, i, list_length(), CopyToStateData::opts, pq_beginmessage(), pq_endmessage(), pq_sendbyte(), and pq_sendint16().

Referenced by DoCopyTo().

◆ SendCopyEnd()

static void SendCopyEnd ( CopyToState  cstate)
static

Definition at line 157 of file copyto.c.

158 {
159  /* Shouldn't have any unsent data */
160  Assert(cstate->fe_msgbuf->len == 0);
161  /* Send Copy Done message */
162  pq_putemptymessage('c');
163 }
void pq_putemptymessage(char msgtype)
Definition: pqformat.c:390

References Assert(), CopyToStateData::fe_msgbuf, StringInfoData::len, and pq_putemptymessage().

Referenced by DoCopyTo().

Variable Documentation

◆ BinarySignature

const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"
static

Definition at line 113 of file copyto.c.

Referenced by DoCopyTo().