PostgreSQL Source Code  git master
copyfrom.c File Reference
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "executor/tuptable.h"
#include "foreign/fdwapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
Include dependency graph for copyfrom.c:

Go to the source code of this file.

Data Structures

struct  CopyMultiInsertBuffer
 
struct  CopyMultiInsertInfo
 

Macros

#define MAX_BUFFERED_TUPLES   1000
 
#define MAX_BUFFERED_BYTES   65535
 
#define MAX_PARTITION_BUFFERS   32
 
#define MAX_COPY_DATA_DISPLAY   100
 

Typedefs

typedef struct CopyMultiInsertBuffer CopyMultiInsertBuffer
 
typedef struct CopyMultiInsertInfo CopyMultiInsertInfo
 

Functions

static char * limit_printout_length (const char *str)
 
static void ClosePipeFromProgram (CopyFromState cstate)
 
void CopyFromErrorCallback (void *arg)
 
static CopyMultiInsertBufferCopyMultiInsertBufferInit (ResultRelInfo *rri)
 
static void CopyMultiInsertInfoSetupBuffer (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoInit (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
 
static bool CopyMultiInsertInfoIsFull (CopyMultiInsertInfo *miinfo)
 
static bool CopyMultiInsertInfoIsEmpty (CopyMultiInsertInfo *miinfo)
 
static void CopyMultiInsertBufferFlush (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
 
static void CopyMultiInsertBufferCleanup (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
 
static void CopyMultiInsertInfoFlush (CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
 
static void CopyMultiInsertInfoCleanup (CopyMultiInsertInfo *miinfo)
 
static TupleTableSlotCopyMultiInsertInfoNextFreeSlot (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoStore (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
 
uint64 CopyFrom (CopyFromState cstate)
 
CopyFromState BeginCopyFrom (ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
 
void EndCopyFrom (CopyFromState cstate)
 

Macro Definition Documentation

◆ MAX_BUFFERED_BYTES

#define MAX_BUFFERED_BYTES   65535

Definition at line 72 of file copyfrom.c.

◆ MAX_BUFFERED_TUPLES

#define MAX_BUFFERED_TUPLES   1000

Definition at line 66 of file copyfrom.c.

◆ MAX_COPY_DATA_DISPLAY

#define MAX_COPY_DATA_DISPLAY   100

◆ MAX_PARTITION_BUFFERS

#define MAX_PARTITION_BUFFERS   32

Definition at line 75 of file copyfrom.c.

Typedef Documentation

◆ CopyMultiInsertBuffer

◆ CopyMultiInsertInfo

Function Documentation

◆ BeginCopyFrom()

CopyFromState BeginCopyFrom ( ParseState pstate,
Relation  rel,
Node whereClause,
const char *  filename,
bool  is_program,
copy_data_source_cb  data_source_cb,
List attnamelist,
List options 
)

Definition at line 1373 of file copyfrom.c.

1381 {
1382  CopyFromState cstate;
1383  bool pipe = (filename == NULL);
1384  TupleDesc tupDesc;
1385  AttrNumber num_phys_attrs,
1386  num_defaults;
1387  FmgrInfo *in_functions;
1388  Oid *typioparams;
1389  Oid in_func_oid;
1390  int *defmap;
1391  ExprState **defexprs;
1392  MemoryContext oldcontext;
1393  bool volatile_defexprs;
1394  const int progress_cols[] = {
1398  };
1399  int64 progress_vals[] = {
1401  0,
1402  0
1403  };
1404 
1405  /* Allocate workspace and zero all fields */
1406  cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1407 
1408  /*
1409  * We allocate everything used by a cstate in a new memory context. This
1410  * avoids memory leaks during repeated use of COPY in a query.
1411  */
1413  "COPY",
1415 
1416  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1417 
1418  /* Extract options from the statement node tree */
1419  ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1420 
1421  /* Process the target relation */
1422  cstate->rel = rel;
1423 
1424  tupDesc = RelationGetDescr(cstate->rel);
1425 
1426  /* process common options or initialization */
1427 
1428  /* Generate or convert list of attributes to process */
1429  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1430 
1431  num_phys_attrs = tupDesc->natts;
1432 
1433  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1434  cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1435  if (cstate->opts.force_notnull_all)
1436  MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1437  else if (cstate->opts.force_notnull)
1438  {
1439  List *attnums;
1440  ListCell *cur;
1441 
1442  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1443 
1444  foreach(cur, attnums)
1445  {
1446  int attnum = lfirst_int(cur);
1447  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1448 
1449  if (!list_member_int(cstate->attnumlist, attnum))
1450  ereport(ERROR,
1451  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1452  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1453  NameStr(attr->attname))));
1454  cstate->opts.force_notnull_flags[attnum - 1] = true;
1455  }
1456  }
1457 
1458  /* Set up soft error handler for ON_ERROR */
1459  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1460  {
1461  cstate->escontext = makeNode(ErrorSaveContext);
1462  cstate->escontext->type = T_ErrorSaveContext;
1463  cstate->escontext->error_occurred = false;
1464 
1465  /*
1466  * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1467  * options later
1468  */
1469  if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1470  cstate->escontext->details_wanted = false;
1471  }
1472  else
1473  cstate->escontext = NULL;
1474 
1475  /* Convert FORCE_NULL name list to per-column flags, check validity */
1476  cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1477  if (cstate->opts.force_null_all)
1478  MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1479  else if (cstate->opts.force_null)
1480  {
1481  List *attnums;
1482  ListCell *cur;
1483 
1484  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1485 
1486  foreach(cur, attnums)
1487  {
1488  int attnum = lfirst_int(cur);
1489  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1490 
1491  if (!list_member_int(cstate->attnumlist, attnum))
1492  ereport(ERROR,
1493  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1494  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1495  NameStr(attr->attname))));
1496  cstate->opts.force_null_flags[attnum - 1] = true;
1497  }
1498  }
1499 
1500  /* Convert convert_selectively name list to per-column flags */
1501  if (cstate->opts.convert_selectively)
1502  {
1503  List *attnums;
1504  ListCell *cur;
1505 
1506  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1507 
1508  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1509 
1510  foreach(cur, attnums)
1511  {
1512  int attnum = lfirst_int(cur);
1513  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1514 
1515  if (!list_member_int(cstate->attnumlist, attnum))
1516  ereport(ERROR,
1517  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1518  errmsg_internal("selected column \"%s\" not referenced by COPY",
1519  NameStr(attr->attname))));
1520  cstate->convert_select_flags[attnum - 1] = true;
1521  }
1522  }
1523 
1524  /* Use client encoding when ENCODING option is not specified. */
1525  if (cstate->opts.file_encoding < 0)
1527  else
1528  cstate->file_encoding = cstate->opts.file_encoding;
1529 
1530  /*
1531  * Look up encoding conversion function.
1532  */
1533  if (cstate->file_encoding == GetDatabaseEncoding() ||
1534  cstate->file_encoding == PG_SQL_ASCII ||
1536  {
1537  cstate->need_transcoding = false;
1538  }
1539  else
1540  {
1541  cstate->need_transcoding = true;
1544  if (!OidIsValid(cstate->conversion_proc))
1545  ereport(ERROR,
1546  (errcode(ERRCODE_UNDEFINED_FUNCTION),
1547  errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1550  }
1551 
1552  cstate->copy_src = COPY_FILE; /* default */
1553 
1554  cstate->whereClause = whereClause;
1555 
1556  /* Initialize state variables */
1557  cstate->eol_type = EOL_UNKNOWN;
1558  cstate->cur_relname = RelationGetRelationName(cstate->rel);
1559  cstate->cur_lineno = 0;
1560  cstate->cur_attname = NULL;
1561  cstate->cur_attval = NULL;
1562  cstate->relname_only = false;
1563 
1564  /*
1565  * Allocate buffers for the input pipeline.
1566  *
1567  * attribute_buf and raw_buf are used in both text and binary modes, but
1568  * input_buf and line_buf only in text mode.
1569  */
1570  cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1571  cstate->raw_buf_index = cstate->raw_buf_len = 0;
1572  cstate->raw_reached_eof = false;
1573 
1574  if (!cstate->opts.binary)
1575  {
1576  /*
1577  * If encoding conversion is needed, we need another buffer to hold
1578  * the converted input data. Otherwise, we can just point input_buf
1579  * to the same buffer as raw_buf.
1580  */
1581  if (cstate->need_transcoding)
1582  {
1583  cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1584  cstate->input_buf_index = cstate->input_buf_len = 0;
1585  }
1586  else
1587  cstate->input_buf = cstate->raw_buf;
1588  cstate->input_reached_eof = false;
1589 
1590  initStringInfo(&cstate->line_buf);
1591  }
1592 
1593  initStringInfo(&cstate->attribute_buf);
1594 
1595  /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1596  if (pstate)
1597  {
1598  cstate->range_table = pstate->p_rtable;
1599  cstate->rteperminfos = pstate->p_rteperminfos;
1600  }
1601 
1602  num_defaults = 0;
1603  volatile_defexprs = false;
1604 
1605  /*
1606  * Pick up the required catalog information for each attribute in the
1607  * relation, including the input function, the element type (to pass to
1608  * the input function), and info about defaults and constraints. (Which
1609  * input function we use depends on text/binary format choice.)
1610  */
1611  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1612  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1613  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1614  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1615 
1616  for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1617  {
1618  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1619 
1620  /* We don't need info for dropped attributes */
1621  if (att->attisdropped)
1622  continue;
1623 
1624  /* Fetch the input function and typioparam info */
1625  if (cstate->opts.binary)
1626  getTypeBinaryInputInfo(att->atttypid,
1627  &in_func_oid, &typioparams[attnum - 1]);
1628  else
1629  getTypeInputInfo(att->atttypid,
1630  &in_func_oid, &typioparams[attnum - 1]);
1631  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1632 
1633  /* Get default info if available */
1634  defexprs[attnum - 1] = NULL;
1635 
1636  /*
1637  * We only need the default values for columns that do not appear in
1638  * the column list, unless the DEFAULT option was given. We never need
1639  * default values for generated columns.
1640  */
1641  if ((cstate->opts.default_print != NULL ||
1642  !list_member_int(cstate->attnumlist, attnum)) &&
1643  !att->attgenerated)
1644  {
1645  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1646  attnum);
1647 
1648  if (defexpr != NULL)
1649  {
1650  /* Run the expression through planner */
1651  defexpr = expression_planner(defexpr);
1652 
1653  /* Initialize executable expression in copycontext */
1654  defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1655 
1656  /* if NOT copied from input */
1657  /* use default value if one exists */
1658  if (!list_member_int(cstate->attnumlist, attnum))
1659  {
1660  defmap[num_defaults] = attnum - 1;
1661  num_defaults++;
1662  }
1663 
1664  /*
1665  * If a default expression looks at the table being loaded,
1666  * then it could give the wrong answer when using
1667  * multi-insert. Since database access can be dynamic this is
1668  * hard to test for exactly, so we use the much wider test of
1669  * whether the default expression is volatile. We allow for
1670  * the special case of when the default expression is the
1671  * nextval() of a sequence which in this specific case is
1672  * known to be safe for use with the multi-insert
1673  * optimization. Hence we use this special case function
1674  * checker rather than the standard check for
1675  * contain_volatile_functions(). Note also that we already
1676  * ran the expression through expression_planner().
1677  */
1678  if (!volatile_defexprs)
1679  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1680  }
1681  }
1682  }
1683 
1684  cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1685 
1686  /* initialize progress */
1688  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1689  cstate->bytes_processed = 0;
1690 
1691  /* We keep those variables in cstate. */
1692  cstate->in_functions = in_functions;
1693  cstate->typioparams = typioparams;
1694  cstate->defmap = defmap;
1695  cstate->defexprs = defexprs;
1696  cstate->volatile_defexprs = volatile_defexprs;
1697  cstate->num_defaults = num_defaults;
1698  cstate->is_program = is_program;
1699 
1700  if (data_source_cb)
1701  {
1702  progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1703  cstate->copy_src = COPY_CALLBACK;
1704  cstate->data_source_cb = data_source_cb;
1705  }
1706  else if (pipe)
1707  {
1708  progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1709  Assert(!is_program); /* the grammar does not allow this */
1711  ReceiveCopyBegin(cstate);
1712  else
1713  cstate->copy_file = stdin;
1714  }
1715  else
1716  {
1717  cstate->filename = pstrdup(filename);
1718 
1719  if (cstate->is_program)
1720  {
1721  progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1722  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1723  if (cstate->copy_file == NULL)
1724  ereport(ERROR,
1726  errmsg("could not execute command \"%s\": %m",
1727  cstate->filename)));
1728  }
1729  else
1730  {
1731  struct stat st;
1732 
1733  progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1734  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1735  if (cstate->copy_file == NULL)
1736  {
1737  /* copy errno because ereport subfunctions might change it */
1738  int save_errno = errno;
1739 
1740  ereport(ERROR,
1742  errmsg("could not open file \"%s\" for reading: %m",
1743  cstate->filename),
1744  (save_errno == ENOENT || save_errno == EACCES) ?
1745  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1746  "You may want a client-side facility such as psql's \\copy.") : 0));
1747  }
1748 
1749  if (fstat(fileno(cstate->copy_file), &st))
1750  ereport(ERROR,
1752  errmsg("could not stat file \"%s\": %m",
1753  cstate->filename)));
1754 
1755  if (S_ISDIR(st.st_mode))
1756  ereport(ERROR,
1757  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1758  errmsg("\"%s\" is a directory", cstate->filename)));
1759 
1760  progress_vals[2] = st.st_size;
1761  }
1762  }
1763 
1764  pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1765 
1766  if (cstate->opts.binary)
1767  {
1768  /* Read and verify binary header */
1769  ReceiveCopyBinaryHeader(cstate);
1770  }
1771 
1772  /* create workspace for CopyReadAttributes results */
1773  if (!cstate->opts.binary)
1774  {
1775  AttrNumber attr_count = list_length(cstate->attnumlist);
1776 
1777  cstate->max_fields = attr_count;
1778  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1779  }
1780 
1781  MemoryContextSwitchTo(oldcontext);
1782 
1783  return cstate;
1784 }
int16 AttrNumber
Definition: attnum.h:21
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:841
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:450
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
@ PROGRESS_COMMAND_COPY
#define NameStr(name)
Definition: c.h:735
#define PG_BINARY_R
Definition: c.h:1264
#define MemSet(start, val, len)
Definition: c.h:1009
#define OidIsValid(objectId)
Definition: c.h:764
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:656
#define INPUT_BUF_SIZE
@ EOL_UNKNOWN
#define RAW_BUF_SIZE
void ReceiveCopyBinaryHeader(CopyFromState cstate)
void ReceiveCopyBegin(CopyFromState cstate)
@ COPY_FILE
Definition: copyto.c:52
@ COPY_CALLBACK
Definition: copyto.c:54
@ DestRemote
Definition: dest.h:89
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
int errcode_for_file_access(void)
Definition: elog.c:883
int errhint(const char *fmt,...)
Definition: elog.c:1322
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:128
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2583
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2686
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ COPY_ON_ERROR_IGNORE
Definition: copy.h:40
@ COPY_ON_ERROR_STOP
Definition: copy.h:39
Assert(fmt[strlen(fmt) - 1] !='\n')
bool list_member_int(const List *list, int datum)
Definition: list.c:702
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2829
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2895
int GetDatabaseEncoding(void)
Definition: mbutils.c:1268
int pg_get_client_encoding(void)
Definition: mbutils.c:337
char * pstrdup(const char *in)
Definition: mcxt.c:1619
void * palloc0(Size size)
Definition: mcxt.c:1232
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * palloc(Size size)
Definition: mcxt.c:1201
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding)
Definition: namespace.c:4065
#define makeNode(_type_)
Definition: nodes.h:155
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static char * filename
Definition: pg_dumpall.c:121
static int list_length(const List *l)
Definition: pg_list.h:152
#define lfirst_int(lc)
Definition: pg_list.h:173
@ PG_SQL_ASCII
Definition: pg_wchar.h:227
#define pg_encoding_to_char
Definition: pg_wchar.h:561
Expr * expression_planner(Expr *expr)
Definition: planner.c:6407
CommandDest whereToSendOutput
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PROGRESS_COPY_COMMAND
Definition: progress.h:143
#define PROGRESS_COPY_TYPE_FILE
Definition: progress.h:152
#define PROGRESS_COPY_COMMAND_FROM
Definition: progress.h:148
#define PROGRESS_COPY_TYPE
Definition: progress.h:144
#define PROGRESS_COPY_TYPE_PROGRAM
Definition: progress.h:153
#define PROGRESS_COPY_BYTES_TOTAL
Definition: progress.h:140
#define PROGRESS_COPY_TYPE_CALLBACK
Definition: progress.h:155
#define PROGRESS_COPY_TYPE_PIPE
Definition: progress.h:154
#define RelationGetRelid(relation)
Definition: rel.h:504
#define RelationGetDescr(relation)
Definition: rel.h:530
#define RelationGetRelationName(relation)
Definition: rel.h:538
Node * build_column_default(Relation rel, int attrno)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
bool force_notnull_all
Definition: copy.h:69
bool binary
Definition: copy.h:53
bool convert_selectively
Definition: copy.h:74
CopyOnErrorChoice on_error
Definition: copy.h:75
List * force_null
Definition: copy.h:71
List * convert_select
Definition: copy.h:76
bool force_null_all
Definition: copy.h:72
bool * force_notnull_flags
Definition: copy.h:70
int file_encoding
Definition: copy.h:51
bool * force_null_flags
Definition: copy.h:73
char * default_print
Definition: copy.h:60
List * force_notnull
Definition: copy.h:68
copy_data_source_cb data_source_cb
StringInfoData line_buf
CopyFormatOptions opts
StringInfoData attribute_buf
MemoryContext copycontext
const char * cur_attval
const char * cur_attname
const char * cur_relname
ErrorSaveContext * escontext
bool details_wanted
Definition: miscnodes.h:47
bool error_occurred
Definition: miscnodes.h:46
NodeTag type
Definition: miscnodes.h:45
Definition: fmgr.h:57
Definition: pg_list.h:54
Definition: nodes.h:129
List * p_rteperminfos
Definition: parse_node.h:194
List * p_rtable
Definition: parse_node.h:193
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define S_ISDIR(m)
Definition: win32_port.h:325
#define fstat
Definition: win32_port.h:283

References AllocateFile(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), attnum, CopyFromStateData::attnumlist, CopyFromStateData::attribute_buf, CopyFormatOptions::binary, build_column_default(), CopyFromStateData::bytes_processed, contain_volatile_functions_not_nextval(), CopyFromStateData::conversion_proc, CopyFormatOptions::convert_select, CopyFromStateData::convert_select_flags, CopyFormatOptions::convert_selectively, COPY_CALLBACK, COPY_FILE, CopyFromStateData::copy_file, COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromStateData::copy_src, CopyFromStateData::copycontext, CopyGetAttnums(), cur, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, CurrentMemoryContext, CopyFromStateData::data_source_cb, CopyFormatOptions::default_print, CopyFromStateData::defaults, CopyFromStateData::defexprs, CopyFromStateData::defmap, DestRemote, ErrorSaveContext::details_wanted, CopyFromStateData::eol_type, EOL_UNKNOWN, ereport, errcode(), errcode_for_file_access(), errhint(), errmsg(), errmsg_internal(), ERROR, ErrorSaveContext::error_occurred, CopyFromStateData::escontext, ExecInitExpr(), expression_planner(), CopyFormatOptions::file_encoding, CopyFromStateData::file_encoding, filename, CopyFromStateData::filename, FindDefaultConversionProc(), fmgr_info(), CopyFormatOptions::force_notnull, CopyFormatOptions::force_notnull_all, CopyFormatOptions::force_notnull_flags, CopyFormatOptions::force_null, CopyFormatOptions::force_null_all, CopyFormatOptions::force_null_flags, fstat, GetDatabaseEncoding(), getTypeBinaryInputInfo(), getTypeInputInfo(), CopyFromStateData::in_functions, initStringInfo(), CopyFromStateData::input_buf, CopyFromStateData::input_buf_index, CopyFromStateData::input_buf_len, INPUT_BUF_SIZE, CopyFromStateData::input_reached_eof, InvalidOid, CopyFromStateData::is_program, lfirst_int, CopyFromStateData::line_buf, list_length(), list_member_int(), makeNode, CopyFromStateData::max_fields, MemoryContextSwitchTo(), MemSet, NameStr, TupleDescData::natts, CopyFromStateData::need_transcoding, CopyFromStateData::num_defaults, OidIsValid, CopyFormatOptions::on_error, OpenPipeStream(), CopyFromStateData::opts, ParseState::p_rtable, ParseState::p_rteperminfos, palloc(), palloc0(), PG_BINARY_R, pg_encoding_to_char, pg_get_client_encoding(), PG_SQL_ASCII, pgstat_progress_start_command(), pgstat_progress_update_multi_param(), ProcessCopyOptions(), PROGRESS_COMMAND_COPY, PROGRESS_COPY_BYTES_TOTAL, PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_FROM, PROGRESS_COPY_TYPE, PROGRESS_COPY_TYPE_CALLBACK, PROGRESS_COPY_TYPE_FILE, PROGRESS_COPY_TYPE_PIPE, PROGRESS_COPY_TYPE_PROGRAM, pstrdup(), CopyFromStateData::range_table, CopyFromStateData::raw_buf, CopyFromStateData::raw_buf_index, CopyFromStateData::raw_buf_len, RAW_BUF_SIZE, CopyFromStateData::raw_fields, CopyFromStateData::raw_reached_eof, ReceiveCopyBegin(), ReceiveCopyBinaryHeader(), CopyFromStateData::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, CopyFromStateData::relname_only, CopyFromStateData::rteperminfos, S_ISDIR, stat::st_mode, stat::st_size, TupleDescAttr, ErrorSaveContext::type, CopyFromStateData::typioparams, CopyFromStateData::volatile_defexprs, CopyFromStateData::whereClause, and whereToSendOutput.

Referenced by copy_table(), DoCopy(), file_acquire_sample_rows(), fileBeginForeignScan(), and fileReScanForeignScan().

◆ ClosePipeFromProgram()

static void ClosePipeFromProgram ( CopyFromState  cstate)
static

Definition at line 1816 of file copyfrom.c.

1817 {
1818  int pclose_rc;
1819 
1820  Assert(cstate->is_program);
1821 
1822  pclose_rc = ClosePipeStream(cstate->copy_file);
1823  if (pclose_rc == -1)
1824  ereport(ERROR,
1826  errmsg("could not close pipe to external command: %m")));
1827  else if (pclose_rc != 0)
1828  {
1829  /*
1830  * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1831  * expectable for the called program to fail with SIGPIPE, and we
1832  * should not report that as an error. Otherwise, SIGPIPE indicates a
1833  * problem.
1834  */
1835  if (!cstate->raw_reached_eof &&
1836  wait_result_is_signal(pclose_rc, SIGPIPE))
1837  return;
1838 
1839  ereport(ERROR,
1840  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1841  errmsg("program \"%s\" failed",
1842  cstate->filename),
1843  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1844  }
1845 }
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1235
int ClosePipeStream(FILE *file)
Definition: fd.c:2991
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
bool wait_result_is_signal(int exit_status, int signum)
Definition: wait_error.c:102
#define SIGPIPE
Definition: win32_port.h:173

References Assert(), ClosePipeStream(), CopyFromStateData::copy_file, ereport, errcode(), errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, CopyFromStateData::filename, CopyFromStateData::is_program, CopyFromStateData::raw_reached_eof, SIGPIPE, wait_result_is_signal(), and wait_result_to_str().

Referenced by EndCopyFrom().

◆ CopyFrom()

uint64 CopyFrom ( CopyFromState  cstate)

Definition at line 633 of file copyfrom.c.

634 {
635  ResultRelInfo *resultRelInfo;
636  ResultRelInfo *target_resultRelInfo;
637  ResultRelInfo *prevResultRelInfo = NULL;
638  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
639  ModifyTableState *mtstate;
640  ExprContext *econtext;
641  TupleTableSlot *singleslot = NULL;
642  MemoryContext oldcontext = CurrentMemoryContext;
643 
644  PartitionTupleRouting *proute = NULL;
645  ErrorContextCallback errcallback;
646  CommandId mycid = GetCurrentCommandId(true);
647  int ti_options = 0; /* start with default options for insert */
648  BulkInsertState bistate = NULL;
649  CopyInsertMethod insertMethod;
650  CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
651  int64 processed = 0;
652  int64 excluded = 0;
653  int64 skipped = 0;
654  bool has_before_insert_row_trig;
655  bool has_instead_insert_row_trig;
656  bool leafpart_use_multi_insert = false;
657 
658  Assert(cstate->rel);
659  Assert(list_length(cstate->range_table) == 1);
660 
661  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
662  Assert(cstate->escontext);
663 
664  /*
665  * The target must be a plain, foreign, or partitioned relation, or have
666  * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
667  * allowed on views, so we only hint about them in the view case.)
668  */
669  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
670  cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
671  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
672  !(cstate->rel->trigdesc &&
674  {
675  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
676  ereport(ERROR,
677  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
678  errmsg("cannot copy to view \"%s\"",
679  RelationGetRelationName(cstate->rel)),
680  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
681  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
682  ereport(ERROR,
683  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684  errmsg("cannot copy to materialized view \"%s\"",
685  RelationGetRelationName(cstate->rel))));
686  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
687  ereport(ERROR,
688  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
689  errmsg("cannot copy to sequence \"%s\"",
690  RelationGetRelationName(cstate->rel))));
691  else
692  ereport(ERROR,
693  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
694  errmsg("cannot copy to non-table relation \"%s\"",
695  RelationGetRelationName(cstate->rel))));
696  }
697 
698  /*
699  * If the target file is new-in-transaction, we assume that checking FSM
700  * for free space is a waste of time. This could possibly be wrong, but
701  * it's unlikely.
702  */
703  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
706  ti_options |= TABLE_INSERT_SKIP_FSM;
707 
708  /*
709  * Optimize if new relation storage was created in this subxact or one of
710  * its committed children and we won't see those rows later as part of an
711  * earlier scan or command. The subxact test ensures that if this subxact
712  * aborts then the frozen rows won't be visible after xact cleanup. Note
713  * that the stronger test of exactly which subtransaction created it is
714  * crucial for correctness of this optimization. The test for an earlier
715  * scan or command tolerates false negatives. FREEZE causes other sessions
716  * to see rows they would not see under MVCC, and a false negative merely
717  * spreads that anomaly to the current session.
718  */
719  if (cstate->opts.freeze)
720  {
721  /*
722  * We currently disallow COPY FREEZE on partitioned tables. The
723  * reason for this is that we've simply not yet opened the partitions
724  * to determine if the optimization can be applied to them. We could
725  * go and open them all here, but doing so may be quite a costly
726  * overhead for small copies. In any case, we may just end up routing
727  * tuples to a small number of partitions. It seems better just to
728  * raise an ERROR for partitioned tables.
729  */
730  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
731  {
732  ereport(ERROR,
733  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
734  errmsg("cannot perform COPY FREEZE on a partitioned table")));
735  }
736 
737  /*
738  * Tolerate one registration for the benefit of FirstXactSnapshot.
739  * Scan-bearing queries generally create at least two registrations,
740  * though relying on that is fragile, as is ignoring ActiveSnapshot.
741  * Clear CatalogSnapshot to avoid counting its registration. We'll
742  * still detect ongoing catalog scans, each of which separately
743  * registers the snapshot it uses.
744  */
747  ereport(ERROR,
748  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
749  errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
750 
751  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
753  ereport(ERROR,
754  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
755  errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
756 
757  ti_options |= TABLE_INSERT_FROZEN;
758  }
759 
760  /*
761  * We need a ResultRelInfo so we can use the regular executor's
762  * index-entry-making machinery. (There used to be a huge amount of code
763  * here that basically duplicated execUtils.c ...)
764  */
765  ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
766  resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
767  ExecInitResultRelation(estate, resultRelInfo, 1);
768 
769  /* Verify the named relation is a valid target for INSERT */
770  CheckValidResultRel(resultRelInfo, CMD_INSERT);
771 
772  ExecOpenIndices(resultRelInfo, false);
773 
774  /*
775  * Set up a ModifyTableState so we can let FDW(s) init themselves for
776  * foreign-table result relation(s).
777  */
778  mtstate = makeNode(ModifyTableState);
779  mtstate->ps.plan = NULL;
780  mtstate->ps.state = estate;
781  mtstate->operation = CMD_INSERT;
782  mtstate->mt_nrels = 1;
783  mtstate->resultRelInfo = resultRelInfo;
784  mtstate->rootResultRelInfo = resultRelInfo;
785 
786  if (resultRelInfo->ri_FdwRoutine != NULL &&
787  resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
788  resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
789  resultRelInfo);
790 
791  /*
792  * Also, if the named relation is a foreign table, determine if the FDW
793  * supports batch insert and determine the batch size (a FDW may support
794  * batching, but it may be disabled for the server/table).
795  *
796  * If the FDW does not support batching, we set the batch size to 1.
797  */
798  if (resultRelInfo->ri_FdwRoutine != NULL &&
799  resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
800  resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
801  resultRelInfo->ri_BatchSize =
802  resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
803  else
804  resultRelInfo->ri_BatchSize = 1;
805 
806  Assert(resultRelInfo->ri_BatchSize >= 1);
807 
808  /* Prepare to catch AFTER triggers. */
810 
811  /*
812  * If there are any triggers with transition tables on the named relation,
813  * we need to be prepared to capture transition tuples.
814  *
815  * Because partition tuple routing would like to know about whether
816  * transition capture is active, we also set it in mtstate, which is
817  * passed to ExecFindPartition() below.
818  */
819  cstate->transition_capture = mtstate->mt_transition_capture =
821  RelationGetRelid(cstate->rel),
822  CMD_INSERT);
823 
824  /*
825  * If the named relation is a partitioned table, initialize state for
826  * CopyFrom tuple routing.
827  */
828  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
829  proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
830 
831  if (cstate->whereClause)
832  cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
833  &mtstate->ps);
834 
835  /*
836  * It's generally more efficient to prepare a bunch of tuples for
837  * insertion, and insert them in one
838  * table_multi_insert()/ExecForeignBatchInsert() call, than call
839  * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
840  * However, there are a number of reasons why we might not be able to do
841  * this. These are explained below.
842  */
843  if (resultRelInfo->ri_TrigDesc != NULL &&
844  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
845  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
846  {
847  /*
848  * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
849  * triggers on the table. Such triggers might query the table we're
850  * inserting into and act differently if the tuples that have already
851  * been processed and prepared for insertion are not there.
852  */
853  insertMethod = CIM_SINGLE;
854  }
855  else if (resultRelInfo->ri_FdwRoutine != NULL &&
856  resultRelInfo->ri_BatchSize == 1)
857  {
858  /*
859  * Can't support multi-inserts to a foreign table if the FDW does not
860  * support batching, or it's disabled for the server or foreign table.
861  */
862  insertMethod = CIM_SINGLE;
863  }
864  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
865  resultRelInfo->ri_TrigDesc->trig_insert_new_table)
866  {
867  /*
868  * For partitioned tables we can't support multi-inserts when there
869  * are any statement level insert triggers. It might be possible to
870  * allow partitioned tables with such triggers in the future, but for
871  * now, CopyMultiInsertInfoFlush expects that any after row insert and
872  * statement level insert triggers are on the same relation.
873  */
874  insertMethod = CIM_SINGLE;
875  }
876  else if (cstate->volatile_defexprs)
877  {
878  /*
879  * Can't support multi-inserts if there are any volatile default
880  * expressions in the table. Similarly to the trigger case above,
881  * such expressions may query the table we're inserting into.
882  *
883  * Note: It does not matter if any partitions have any volatile
884  * default expressions as we use the defaults from the target of the
885  * COPY command.
886  */
887  insertMethod = CIM_SINGLE;
888  }
889  else if (contain_volatile_functions(cstate->whereClause))
890  {
891  /*
892  * Can't support multi-inserts if there are any volatile function
893  * expressions in WHERE clause. Similarly to the trigger case above,
894  * such expressions may query the table we're inserting into.
895  *
896  * Note: the whereClause was already preprocessed in DoCopy(), so it's
897  * okay to use contain_volatile_functions() directly.
898  */
899  insertMethod = CIM_SINGLE;
900  }
901  else
902  {
903  /*
904  * For partitioned tables, we may still be able to perform bulk
905  * inserts. However, the possibility of this depends on which types
906  * of triggers exist on the partition. We must disable bulk inserts
907  * if the partition is a foreign table that can't use batching or it
908  * has any before row insert or insert instead triggers (same as we
909  * checked above for the parent table). Since the partition's
910  * resultRelInfos are initialized only when we actually need to insert
911  * the first tuple into them, we must have the intermediate insert
912  * method of CIM_MULTI_CONDITIONAL to flag that we must later
913  * determine if we can use bulk-inserts for the partition being
914  * inserted into.
915  */
916  if (proute)
917  insertMethod = CIM_MULTI_CONDITIONAL;
918  else
919  insertMethod = CIM_MULTI;
920 
921  CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
922  estate, mycid, ti_options);
923  }
924 
925  /*
926  * If not using batch mode (which allocates slots as needed) set up a
927  * tuple slot too. When inserting into a partitioned table, we also need
928  * one, even if we might batch insert, to read the tuple in the root
929  * partition's form.
930  */
931  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
932  {
933  singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
934  &estate->es_tupleTable);
935  bistate = GetBulkInsertState();
936  }
937 
938  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
939  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
940 
941  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
942  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
943 
944  /*
945  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
946  * should do this for COPY, since it's not really an "INSERT" statement as
947  * such. However, executing these triggers maintains consistency with the
948  * EACH ROW triggers that we already fire on COPY.
949  */
950  ExecBSInsertTriggers(estate, resultRelInfo);
951 
952  econtext = GetPerTupleExprContext(estate);
953 
954  /* Set up callback to identify error line number */
955  errcallback.callback = CopyFromErrorCallback;
956  errcallback.arg = (void *) cstate;
957  errcallback.previous = error_context_stack;
958  error_context_stack = &errcallback;
959 
960  for (;;)
961  {
962  TupleTableSlot *myslot;
963  bool skip_tuple;
964 
966 
967  /*
968  * Reset the per-tuple exprcontext. We do this after every tuple, to
969  * clean-up after expression evaluations etc.
970  */
971  ResetPerTupleExprContext(estate);
972 
973  /* select slot to (initially) load row into */
974  if (insertMethod == CIM_SINGLE || proute)
975  {
976  myslot = singleslot;
977  Assert(myslot != NULL);
978  }
979  else
980  {
981  Assert(resultRelInfo == target_resultRelInfo);
982  Assert(insertMethod == CIM_MULTI);
983 
984  myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
985  resultRelInfo);
986  }
987 
988  /*
989  * Switch to per-tuple context before calling NextCopyFrom, which does
990  * evaluate default expressions etc. and requires per-tuple context.
991  */
993 
994  ExecClearTuple(myslot);
995 
996  /* Directly store the values/nulls array in the slot */
997  if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
998  break;
999 
1000  if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1001  cstate->escontext->error_occurred)
1002  {
1003  /*
1004  * Soft error occured, skip this tuple and deal with error
1005  * information according to ON_ERROR.
1006  */
1007  if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1008 
1009  /*
1010  * Just make ErrorSaveContext ready for the next NextCopyFrom.
1011  * Since we don't set details_wanted and error_data is not to
1012  * be filled, just resetting error_occurred is enough.
1013  */
1014  cstate->escontext->error_occurred = false;
1015 
1016  /* Report that this tuple was skipped by the ON_ERROR clause */
1018  ++skipped);
1019 
1020  continue;
1021  }
1022 
1023  ExecStoreVirtualTuple(myslot);
1024 
1025  /*
1026  * Constraints and where clause might reference the tableoid column,
1027  * so (re-)initialize tts_tableOid before evaluating them.
1028  */
1029  myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1030 
1031  /* Triggers and stuff need to be invoked in query context. */
1032  MemoryContextSwitchTo(oldcontext);
1033 
1034  if (cstate->whereClause)
1035  {
1036  econtext->ecxt_scantuple = myslot;
1037  /* Skip items that don't match COPY's WHERE clause */
1038  if (!ExecQual(cstate->qualexpr, econtext))
1039  {
1040  /*
1041  * Report that this tuple was filtered out by the WHERE
1042  * clause.
1043  */
1045  ++excluded);
1046  continue;
1047  }
1048  }
1049 
1050  /* Determine the partition to insert the tuple into */
1051  if (proute)
1052  {
1053  TupleConversionMap *map;
1054 
1055  /*
1056  * Attempt to find a partition suitable for this tuple.
1057  * ExecFindPartition() will raise an error if none can be found or
1058  * if the found partition is not suitable for INSERTs.
1059  */
1060  resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1061  proute, myslot, estate);
1062 
1063  if (prevResultRelInfo != resultRelInfo)
1064  {
1065  /* Determine which triggers exist on this partition */
1066  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1067  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1068 
1069  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1070  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1071 
1072  /*
1073  * Disable multi-inserts when the partition has BEFORE/INSTEAD
1074  * OF triggers, or if the partition is a foreign table that
1075  * can't use batching.
1076  */
1077  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1078  !has_before_insert_row_trig &&
1079  !has_instead_insert_row_trig &&
1080  (resultRelInfo->ri_FdwRoutine == NULL ||
1081  resultRelInfo->ri_BatchSize > 1);
1082 
1083  /* Set the multi-insert buffer to use for this partition. */
1084  if (leafpart_use_multi_insert)
1085  {
1086  if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1087  CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1088  resultRelInfo);
1089  }
1090  else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1091  !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1092  {
1093  /*
1094  * Flush pending inserts if this partition can't use
1095  * batching, so rows are visible to triggers etc.
1096  */
1097  CopyMultiInsertInfoFlush(&multiInsertInfo,
1098  resultRelInfo,
1099  &processed);
1100  }
1101 
1102  if (bistate != NULL)
1103  ReleaseBulkInsertStatePin(bistate);
1104  prevResultRelInfo = resultRelInfo;
1105  }
1106 
1107  /*
1108  * If we're capturing transition tuples, we might need to convert
1109  * from the partition rowtype to root rowtype. But if there are no
1110  * BEFORE triggers on the partition that could change the tuple,
1111  * we can just remember the original unconverted tuple to avoid a
1112  * needless round trip conversion.
1113  */
1114  if (cstate->transition_capture != NULL)
1116  !has_before_insert_row_trig ? myslot : NULL;
1117 
1118  /*
1119  * We might need to convert from the root rowtype to the partition
1120  * rowtype.
1121  */
1122  map = ExecGetRootToChildMap(resultRelInfo, estate);
1123  if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1124  {
1125  /* non batch insert */
1126  if (map != NULL)
1127  {
1128  TupleTableSlot *new_slot;
1129 
1130  new_slot = resultRelInfo->ri_PartitionTupleSlot;
1131  myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1132  }
1133  }
1134  else
1135  {
1136  /*
1137  * Prepare to queue up tuple for later batch insert into
1138  * current partition.
1139  */
1140  TupleTableSlot *batchslot;
1141 
1142  /* no other path available for partitioned table */
1143  Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1144 
1145  batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1146  resultRelInfo);
1147 
1148  if (map != NULL)
1149  myslot = execute_attr_map_slot(map->attrMap, myslot,
1150  batchslot);
1151  else
1152  {
1153  /*
1154  * This looks more expensive than it is (Believe me, I
1155  * optimized it away. Twice.). The input is in virtual
1156  * form, and we'll materialize the slot below - for most
1157  * slot types the copy performs the work materialization
1158  * would later require anyway.
1159  */
1160  ExecCopySlot(batchslot, myslot);
1161  myslot = batchslot;
1162  }
1163  }
1164 
1165  /* ensure that triggers etc see the right relation */
1166  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1167  }
1168 
1169  skip_tuple = false;
1170 
1171  /* BEFORE ROW INSERT Triggers */
1172  if (has_before_insert_row_trig)
1173  {
1174  if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1175  skip_tuple = true; /* "do nothing" */
1176  }
1177 
1178  if (!skip_tuple)
1179  {
1180  /*
1181  * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1182  * tuple. Otherwise, proceed with inserting the tuple into the
1183  * table or foreign table.
1184  */
1185  if (has_instead_insert_row_trig)
1186  {
1187  ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1188  }
1189  else
1190  {
1191  /* Compute stored generated columns */
1192  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1194  ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1195  CMD_INSERT);
1196 
1197  /*
1198  * If the target is a plain table, check the constraints of
1199  * the tuple.
1200  */
1201  if (resultRelInfo->ri_FdwRoutine == NULL &&
1202  resultRelInfo->ri_RelationDesc->rd_att->constr)
1203  ExecConstraints(resultRelInfo, myslot, estate);
1204 
1205  /*
1206  * Also check the tuple against the partition constraint, if
1207  * there is one; except that if we got here via tuple-routing,
1208  * we don't need to if there's no BR trigger defined on the
1209  * partition.
1210  */
1211  if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1212  (proute == NULL || has_before_insert_row_trig))
1213  ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1214 
1215  /* Store the slot in the multi-insert buffer, when enabled. */
1216  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1217  {
1218  /*
1219  * The slot previously might point into the per-tuple
1220  * context. For batching it needs to be longer lived.
1221  */
1222  ExecMaterializeSlot(myslot);
1223 
1224  /* Add this tuple to the tuple buffer */
1225  CopyMultiInsertInfoStore(&multiInsertInfo,
1226  resultRelInfo, myslot,
1227  cstate->line_buf.len,
1228  cstate->cur_lineno);
1229 
1230  /*
1231  * If enough inserts have queued up, then flush all
1232  * buffers out to their tables.
1233  */
1234  if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1235  CopyMultiInsertInfoFlush(&multiInsertInfo,
1236  resultRelInfo,
1237  &processed);
1238 
1239  /*
1240  * We delay updating the row counter and progress of the
1241  * COPY command until after writing the tuples stored in
1242  * the buffer out to the table, as in single insert mode.
1243  * See CopyMultiInsertBufferFlush().
1244  */
1245  continue; /* next tuple please */
1246  }
1247  else
1248  {
1249  List *recheckIndexes = NIL;
1250 
1251  /* OK, store the tuple */
1252  if (resultRelInfo->ri_FdwRoutine != NULL)
1253  {
1254  myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1255  resultRelInfo,
1256  myslot,
1257  NULL);
1258 
1259  if (myslot == NULL) /* "do nothing" */
1260  continue; /* next tuple please */
1261 
1262  /*
1263  * AFTER ROW Triggers might reference the tableoid
1264  * column, so (re-)initialize tts_tableOid before
1265  * evaluating them.
1266  */
1267  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1268  }
1269  else
1270  {
1271  /* OK, store the tuple and create index entries for it */
1272  table_tuple_insert(resultRelInfo->ri_RelationDesc,
1273  myslot, mycid, ti_options, bistate);
1274 
1275  if (resultRelInfo->ri_NumIndices > 0)
1276  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1277  myslot,
1278  estate,
1279  false,
1280  false,
1281  NULL,
1282  NIL,
1283  false);
1284  }
1285 
1286  /* AFTER ROW INSERT Triggers */
1287  ExecARInsertTriggers(estate, resultRelInfo, myslot,
1288  recheckIndexes, cstate->transition_capture);
1289 
1290  list_free(recheckIndexes);
1291  }
1292  }
1293 
1294  /*
1295  * We count only tuples not suppressed by a BEFORE INSERT trigger
1296  * or FDW; this is the same definition used by nodeModifyTable.c
1297  * for counting tuples inserted by an INSERT command. Update
1298  * progress of the COPY command as well.
1299  */
1301  ++processed);
1302  }
1303  }
1304 
1305  /* Flush any remaining buffered tuples */
1306  if (insertMethod != CIM_SINGLE)
1307  {
1308  if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1309  CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1310  }
1311 
1312  /* Done, clean up */
1313  error_context_stack = errcallback.previous;
1314 
1315  if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1316  cstate->num_errors > 0)
1317  ereport(NOTICE,
1318  errmsg_plural("%llu row was skipped due to data type incompatibility",
1319  "%llu rows were skipped due to data type incompatibility",
1320  (unsigned long long) cstate->num_errors,
1321  (unsigned long long) cstate->num_errors));
1322 
1323  if (bistate != NULL)
1324  FreeBulkInsertState(bistate);
1325 
1326  MemoryContextSwitchTo(oldcontext);
1327 
1328  /* Execute AFTER STATEMENT insertion triggers */
1329  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1330 
1331  /* Handle queued AFTER triggers */
1332  AfterTriggerEndQuery(estate);
1333 
1334  ExecResetTupleTable(estate->es_tupleTable, false);
1335 
1336  /* Allow the FDW to shut down */
1337  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1338  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1339  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1340  target_resultRelInfo);
1341 
1342  /* Tear down the multi-insert buffer data */
1343  if (insertMethod != CIM_SINGLE)
1344  CopyMultiInsertInfoCleanup(&multiInsertInfo);
1345 
1346  /* Close all the partitioned tables, leaf partitions, and their indices */
1347  if (proute)
1348  ExecCleanupTupleRouting(mtstate, proute);
1349 
1350  /* Close the result relations, including any trigger target relations */
1351  ExecCloseResultRelations(estate);
1353 
1354  FreeExecutorState(estate);
1355 
1356  return processed;
1357 }
void pgstat_progress_update_param(int index, int64 val)
#define InvalidSubTransactionId
Definition: c.h:647
uint32 CommandId
Definition: c.h:655
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:521
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:243
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copyfrom.c:263
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
Definition: copyfrom.c:525
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copyfrom.c:610
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:572
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:288
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:591
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:300
void CopyFromErrorCallback(void *arg)
Definition: copyfrom.c:117
CopyInsertMethod
@ CIM_SINGLE
@ CIM_MULTI_CONDITIONAL
@ CIM_MULTI
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1185
ErrorContextCallback * error_context_stack
Definition: elog.c:95
#define NOTICE
Definition: elog.h:35
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:214
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:298
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:156
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1816
void ExecCloseResultRelations(EState *estate)
Definition: execMain.c:1541
void ExecCloseRangeTableRelations(EState *estate)
Definition: execMain.c:1601
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1940
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
Definition: execMain.c:1024
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1190
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1551
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:733
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
Definition: execUtils.c:819
EState * CreateExecutorState(void)
Definition: execUtils.c:93
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1237
void FreeExecutorState(EState *estate)
Definition: execUtils.c:194
#define ResetPerTupleExprContext(estate)
Definition: executor.h:558
#define GetPerTupleExprContext(estate)
Definition: executor.h:549
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:554
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:412
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:1790
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1761
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:1778
void list_free(List *list)
Definition: list.c:1546
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:257
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
#define NIL
Definition: pg_list.h:68
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1169
#define PROGRESS_COPY_TUPLES_PROCESSED
Definition: progress.h:141
#define PROGRESS_COPY_TUPLES_EXCLUDED
Definition: progress.h:142
#define PROGRESS_COPY_TUPLES_SKIPPED
Definition: progress.h:145
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1612
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:429
bool freeze
Definition: copy.h:54
TransitionCaptureState * transition_capture
List * es_tupleTable
Definition: execnodes.h:667
struct ErrorContextCallback * previous
Definition: elog.h:295
void(* callback)(void *arg)
Definition: elog.h:296
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:255
EndForeignInsert_function EndForeignInsert
Definition: fdwapi.h:239
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:238
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:232
ExecForeignBatchInsert_function ExecForeignBatchInsert
Definition: fdwapi.h:233
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
Definition: fdwapi.h:234
CmdType operation
Definition: execnodes.h:1282
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1286
PlanState ps
Definition: execnodes.h:1281
ResultRelInfo * rootResultRelInfo
Definition: execnodes.h:1294
struct TransitionCaptureState * mt_transition_capture
Definition: execnodes.h:1320
Plan * plan
Definition: execnodes.h:1043
EState * state
Definition: execnodes.h:1045
SubTransactionId rd_firstRelfilelocatorSubid
Definition: rel.h:106
TriggerDesc * trigdesc
Definition: rel.h:117
TupleDesc rd_att
Definition: rel.h:112
SubTransactionId rd_newRelfilelocatorSubid
Definition: rel.h:104
SubTransactionId rd_createSubid
Definition: rel.h:103
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:581
int ri_NumIndices
Definition: execnodes.h:459
Relation ri_RelationDesc
Definition: execnodes.h:456
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:584
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:486
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:503
int ri_BatchSize
Definition: execnodes.h:514
TupleTableSlot * tcs_original_insert_tuple
Definition: trigger.h:76
bool trig_insert_instead_row
Definition: reltrigger.h:58
bool trig_insert_new_table
Definition: reltrigger.h:75
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:45
AttrMap * attrMap
Definition: tupconvert.h:28
TupleConstr * constr
Definition: tupdesc.h:85
Oid tts_tableOid
Definition: tuptable.h:130
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
#define TABLE_INSERT_FROZEN
Definition: tableam.h:253
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:252
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1397
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2401
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2465
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2558
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2541
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4889
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2452
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5045
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5025
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:433
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:488
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:451
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:780
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818

References AfterTriggerBeginQuery(), AfterTriggerEndQuery(), ErrorContextCallback::arg, Assert(), TupleConversionMap::attrMap, FdwRoutine::BeginForeignInsert, ErrorContextCallback::callback, castNode, CHECK_FOR_INTERRUPTS, CheckValidResultRel(), CIM_MULTI, CIM_MULTI_CONDITIONAL, CIM_SINGLE, CMD_INSERT, TupleDescData::constr, contain_volatile_functions(), COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromErrorCallback(), CopyMultiInsertInfoCleanup(), CopyMultiInsertInfoFlush(), CopyMultiInsertInfoInit(), CopyMultiInsertInfoIsEmpty(), CopyMultiInsertInfoIsFull(), CopyMultiInsertInfoNextFreeSlot(), CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfoStore(), CreateExecutorState(), CopyFromStateData::cur_lineno, CurrentMemoryContext, ExprContext::ecxt_scantuple, FdwRoutine::EndForeignInsert, ereport, errcode(), errhint(), errmsg(), errmsg_plural(), ERROR, error_context_stack, ErrorSaveContext::error_occurred, EState::es_tupleTable, CopyFromStateData::escontext, ExecARInsertTriggers(), ExecASInsertTriggers(), ExecBRInsertTriggers(), ExecBSInsertTriggers(), ExecCleanupTupleRouting(), ExecClearTuple(), ExecCloseRangeTableRelations(), ExecCloseResultRelations(), ExecComputeStoredGenerated(), ExecConstraints(), ExecCopySlot(), ExecFindPartition(), FdwRoutine::ExecForeignBatchInsert, FdwRoutine::ExecForeignInsert, ExecGetRootToChildMap(), ExecInitQual(), ExecInitRangeTable(), ExecInitResultRelation(), ExecInsertIndexTuples(), ExecIRInsertTriggers(), ExecMaterializeSlot(), ExecOpenIndices(), ExecPartitionCheck(), ExecQual(), ExecResetTupleTable(), ExecSetupPartitionTupleRouting(), ExecStoreVirtualTuple(), execute_attr_map_slot(), FreeBulkInsertState(), FreeExecutorState(), CopyFormatOptions::freeze, GetBulkInsertState(), GetCurrentCommandId(), GetCurrentSubTransactionId(), FdwRoutine::GetForeignModifyBatchSize, GetPerTupleExprContext, GetPerTupleMemoryContext, TupleConstr::has_generated_stored, InvalidateCatalogSnapshot(), InvalidSubTransactionId, StringInfoData::len, CopyFromStateData::line_buf, list_free(), list_length(), makeNode, MakeTransitionCaptureState(), MemoryContextSwitchTo(), ModifyTableState::mt_nrels, ModifyTableState::mt_transition_capture, NextCopyFrom(), NIL, NOTICE, CopyFromStateData::num_errors, CopyFormatOptions::on_error, ModifyTableState::operation, CopyFromStateData::opts, pgstat_progress_update_param(), PlanState::plan, ErrorContextCallback::previous, PROGRESS_COPY_TUPLES_EXCLUDED, PROGRESS_COPY_TUPLES_PROCESSED, PROGRESS_COPY_TUPLES_SKIPPED, ModifyTableState::ps, CopyFromStateData::qualexpr, CopyFromStateData::range_table, RelationData::rd_att, RelationData::rd_createSubid, RelationData::rd_firstRelfilelocatorSubid, RelationData::rd_newRelfilelocatorSubid, RelationData::rd_rel, CopyFromStateData::rel, RelationGetRelationName, RelationGetRelid, ReleaseBulkInsertStatePin(), ResetPerTupleExprContext, ModifyTableState::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, ModifyTableState::rootResultRelInfo, CopyFromStateData::rteperminfos, PlanState::state, TABLE_INSERT_FROZEN, TABLE_INSERT_SKIP_FSM, table_slot_create(), table_tuple_insert(), TransitionCaptureState::tcs_original_insert_tuple, ThereAreNoPriorRegisteredSnapshots(), ThereAreNoReadyPortals(), CopyFromStateData::transition_capture, TriggerDesc::trig_insert_before_row, TriggerDesc::trig_insert_instead_row, TriggerDesc::trig_insert_new_table, RelationData::trigdesc, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tableOid, TupleTableSlot::tts_values, CopyFromStateData::volatile_defexprs, and CopyFromStateData::whereClause.

Referenced by copy_table(), and DoCopy().

◆ CopyFromErrorCallback()

void CopyFromErrorCallback ( void *  arg)

Definition at line 117 of file copyfrom.c.

118 {
119  CopyFromState cstate = (CopyFromState) arg;
120 
121  if (cstate->relname_only)
122  {
123  errcontext("COPY %s",
124  cstate->cur_relname);
125  return;
126  }
127  if (cstate->opts.binary)
128  {
129  /* can't usefully display the data */
130  if (cstate->cur_attname)
131  errcontext("COPY %s, line %llu, column %s",
132  cstate->cur_relname,
133  (unsigned long long) cstate->cur_lineno,
134  cstate->cur_attname);
135  else
136  errcontext("COPY %s, line %llu",
137  cstate->cur_relname,
138  (unsigned long long) cstate->cur_lineno);
139  }
140  else
141  {
142  if (cstate->cur_attname && cstate->cur_attval)
143  {
144  /* error is relevant to a particular column */
145  char *attval;
146 
147  attval = limit_printout_length(cstate->cur_attval);
148  errcontext("COPY %s, line %llu, column %s: \"%s\"",
149  cstate->cur_relname,
150  (unsigned long long) cstate->cur_lineno,
151  cstate->cur_attname,
152  attval);
153  pfree(attval);
154  }
155  else if (cstate->cur_attname)
156  {
157  /* error is relevant to a particular column, value is NULL */
158  errcontext("COPY %s, line %llu, column %s: null input",
159  cstate->cur_relname,
160  (unsigned long long) cstate->cur_lineno,
161  cstate->cur_attname);
162  }
163  else
164  {
165  /*
166  * Error is relevant to a particular line.
167  *
168  * If line_buf still contains the correct line, print it.
169  */
170  if (cstate->line_buf_valid)
171  {
172  char *lineval;
173 
174  lineval = limit_printout_length(cstate->line_buf.data);
175  errcontext("COPY %s, line %llu: \"%s\"",
176  cstate->cur_relname,
177  (unsigned long long) cstate->cur_lineno, lineval);
178  pfree(lineval);
179  }
180  else
181  {
182  errcontext("COPY %s, line %llu",
183  cstate->cur_relname,
184  (unsigned long long) cstate->cur_lineno);
185  }
186  }
187  }
188 }
static char * limit_printout_length(const char *str)
Definition: copyfrom.c:196
#define errcontext
Definition: elog.h:196
struct CopyFromStateData * CopyFromState
Definition: copy.h:80
void pfree(void *pointer)
Definition: mcxt.c:1431
void * arg

References arg, CopyFormatOptions::binary, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, StringInfoData::data, errcontext, limit_printout_length(), CopyFromStateData::line_buf, CopyFromStateData::line_buf_valid, CopyFromStateData::opts, pfree(), and CopyFromStateData::relname_only.

Referenced by CopyFrom(), file_acquire_sample_rows(), and fileIterateForeignScan().

◆ CopyMultiInsertBufferCleanup()

static void CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer 
)
inlinestatic

Definition at line 483 of file copyfrom.c.

485 {
486  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
487  int i;
488 
489  /* Ensure buffer was flushed */
490  Assert(buffer->nused == 0);
491 
492  /* Remove back-link to ourself */
493  resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
494 
495  if (resultRelInfo->ri_FdwRoutine == NULL)
496  {
497  Assert(buffer->bistate != NULL);
498  FreeBulkInsertState(buffer->bistate);
499  }
500  else
501  Assert(buffer->bistate == NULL);
502 
503  /* Since we only create slots on demand, just drop the non-null ones. */
504  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
506 
507  if (resultRelInfo->ri_FdwRoutine == NULL)
509  miinfo->ti_options);
510 
511  pfree(buffer);
512 }
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:66
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1253
int i
Definition: isn.c:73
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:80
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:81
BulkInsertState bistate
Definition: copyfrom.c:82
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1590

References Assert(), CopyMultiInsertBuffer::bistate, ExecDropSingleTupleTableSlot(), FreeBulkInsertState(), i, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, pfree(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, table_finish_bulk_insert(), and CopyMultiInsertInfo::ti_options.

Referenced by CopyMultiInsertInfoCleanup(), and CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferFlush()

static void CopyMultiInsertBufferFlush ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer,
int64 *  processed 
)
inlinestatic

Definition at line 309 of file copyfrom.c.

312 {
313  CopyFromState cstate = miinfo->cstate;
314  EState *estate = miinfo->estate;
315  int nused = buffer->nused;
316  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
317  TupleTableSlot **slots = buffer->slots;
318  int i;
319 
320  if (resultRelInfo->ri_FdwRoutine)
321  {
322  int batch_size = resultRelInfo->ri_BatchSize;
323  int sent = 0;
324 
325  Assert(buffer->bistate == NULL);
326 
327  /* Ensure that the FDW supports batching and it's enabled */
329  Assert(batch_size > 1);
330 
331  /*
332  * We suppress error context information other than the relation name,
333  * if one of the operations below fails.
334  */
335  Assert(!cstate->relname_only);
336  cstate->relname_only = true;
337 
338  while (sent < nused)
339  {
340  int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
341  int inserted = size;
342  TupleTableSlot **rslots;
343 
344  /* insert into foreign table: let the FDW do it */
345  rslots =
346  resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
347  resultRelInfo,
348  &slots[sent],
349  NULL,
350  &inserted);
351 
352  sent += size;
353 
354  /* No need to do anything if there are no inserted rows */
355  if (inserted <= 0)
356  continue;
357 
358  /* Triggers on foreign tables should not have transition tables */
359  Assert(resultRelInfo->ri_TrigDesc == NULL ||
360  resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
361 
362  /* Run AFTER ROW INSERT triggers */
363  if (resultRelInfo->ri_TrigDesc != NULL &&
364  resultRelInfo->ri_TrigDesc->trig_insert_after_row)
365  {
366  Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
367 
368  for (i = 0; i < inserted; i++)
369  {
370  TupleTableSlot *slot = rslots[i];
371 
372  /*
373  * AFTER ROW Triggers might reference the tableoid column,
374  * so (re-)initialize tts_tableOid before evaluating them.
375  */
376  slot->tts_tableOid = relid;
377 
378  ExecARInsertTriggers(estate, resultRelInfo,
379  slot, NIL,
380  cstate->transition_capture);
381  }
382  }
383 
384  /* Update the row counter and progress of the COPY command */
385  *processed += inserted;
387  *processed);
388  }
389 
390  for (i = 0; i < nused; i++)
391  ExecClearTuple(slots[i]);
392 
393  /* reset relname_only */
394  cstate->relname_only = false;
395  }
396  else
397  {
398  CommandId mycid = miinfo->mycid;
399  int ti_options = miinfo->ti_options;
400  bool line_buf_valid = cstate->line_buf_valid;
401  uint64 save_cur_lineno = cstate->cur_lineno;
402  MemoryContext oldcontext;
403 
404  Assert(buffer->bistate != NULL);
405 
406  /*
407  * Print error context information correctly, if one of the operations
408  * below fails.
409  */
410  cstate->line_buf_valid = false;
411 
412  /*
413  * table_multi_insert may leak memory, so switch to short-lived memory
414  * context before calling it.
415  */
416  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
417  table_multi_insert(resultRelInfo->ri_RelationDesc,
418  slots,
419  nused,
420  mycid,
421  ti_options,
422  buffer->bistate);
423  MemoryContextSwitchTo(oldcontext);
424 
425  for (i = 0; i < nused; i++)
426  {
427  /*
428  * If there are any indexes, update them for all the inserted
429  * tuples, and run AFTER ROW INSERT triggers.
430  */
431  if (resultRelInfo->ri_NumIndices > 0)
432  {
433  List *recheckIndexes;
434 
435  cstate->cur_lineno = buffer->linenos[i];
436  recheckIndexes =
437  ExecInsertIndexTuples(resultRelInfo,
438  buffer->slots[i], estate, false,
439  false, NULL, NIL, false);
440  ExecARInsertTriggers(estate, resultRelInfo,
441  slots[i], recheckIndexes,
442  cstate->transition_capture);
443  list_free(recheckIndexes);
444  }
445 
446  /*
447  * There's no indexes, but see if we need to run AFTER ROW INSERT
448  * triggers anyway.
449  */
450  else if (resultRelInfo->ri_TrigDesc != NULL &&
451  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
452  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
453  {
454  cstate->cur_lineno = buffer->linenos[i];
455  ExecARInsertTriggers(estate, resultRelInfo,
456  slots[i], NIL,
457  cstate->transition_capture);
458  }
459 
460  ExecClearTuple(slots[i]);
461  }
462 
463  /* Update the row counter and progress of the COPY command */
464  *processed += nused;
466  *processed);
467 
468  /* reset cur_lineno and line_buf_valid to what they were */
469  cstate->line_buf_valid = line_buf_valid;
470  cstate->cur_lineno = save_cur_lineno;
471  }
472 
473  /* Mark that all slots are free */
474  buffer->nused = 0;
475 }
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:85
CommandId mycid
Definition: copyfrom.c:101
CopyFromState cstate
Definition: copyfrom.c:99
bool trig_insert_after_row
Definition: reltrigger.h:57
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1452

References Assert(), CopyMultiInsertBuffer::bistate, CopyMultiInsertInfo::cstate, CopyFromStateData::cur_lineno, CopyMultiInsertInfo::estate, ExecARInsertTriggers(), ExecClearTuple(), FdwRoutine::ExecForeignBatchInsert, ExecInsertIndexTuples(), GetPerTupleMemoryContext, i, CopyFromStateData::line_buf_valid, CopyMultiInsertBuffer::linenos, list_free(), MemoryContextSwitchTo(), CopyMultiInsertInfo::mycid, NIL, CopyMultiInsertBuffer::nused, pgstat_progress_update_param(), PROGRESS_COPY_TUPLES_PROCESSED, RelationGetRelid, CopyFromStateData::relname_only, CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, CopyMultiInsertBuffer::slots, table_multi_insert(), CopyMultiInsertInfo::ti_options, CopyFromStateData::transition_capture, TriggerDesc::trig_insert_after_row, TriggerDesc::trig_insert_new_table, and TupleTableSlot::tts_tableOid.

Referenced by CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferInit()

static CopyMultiInsertBuffer* CopyMultiInsertBufferInit ( ResultRelInfo rri)
static

Definition at line 226 of file copyfrom.c.

227 {
228  CopyMultiInsertBuffer *buffer;
229 
230  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
231  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
232  buffer->resultRelInfo = rri;
233  buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
234  buffer->nused = 0;
235 
236  return buffer;
237 }

References CopyMultiInsertBuffer::bistate, GetBulkInsertState(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, palloc(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_FdwRoutine, and CopyMultiInsertBuffer::slots.

Referenced by CopyMultiInsertInfoSetupBuffer().

◆ CopyMultiInsertInfoCleanup()

static void CopyMultiInsertInfoCleanup ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 572 of file copyfrom.c.

573 {
574  ListCell *lc;
575 
576  foreach(lc, miinfo->multiInsertBuffers)
578 
579  list_free(miinfo->multiInsertBuffers);
580 }
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:483
#define lfirst(lc)
Definition: pg_list.h:172
List * multiInsertBuffers
Definition: copyfrom.c:96

References CopyMultiInsertBufferCleanup(), lfirst, list_free(), and CopyMultiInsertInfo::multiInsertBuffers.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoFlush()

static void CopyMultiInsertInfoFlush ( CopyMultiInsertInfo miinfo,
ResultRelInfo curr_rri,
int64 *  processed 
)
inlinestatic

Definition at line 525 of file copyfrom.c.

527 {
528  ListCell *lc;
529 
530  foreach(lc, miinfo->multiInsertBuffers)
531  {
533 
534  CopyMultiInsertBufferFlush(miinfo, buffer, processed);
535  }
536 
537  miinfo->bufferedTuples = 0;
538  miinfo->bufferedBytes = 0;
539 
540  /*
541  * Trim the list of tracked buffers down if it exceeds the limit. Here we
542  * remove buffers starting with the ones we created first. It seems less
543  * likely that these older ones will be needed than the ones that were
544  * just created.
545  */
547  {
548  CopyMultiInsertBuffer *buffer;
549 
550  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
551 
552  /*
553  * We never want to remove the buffer that's currently being used, so
554  * if we happen to find that then move it to the end of the list.
555  */
556  if (buffer->resultRelInfo == curr_rri)
557  {
559  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
560  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
561  }
562 
563  CopyMultiInsertBufferCleanup(miinfo, buffer);
565  }
566 }
#define MAX_PARTITION_BUFFERS
Definition: copyfrom.c:75
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
Definition: copyfrom.c:309
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_delete_first(List *list)
Definition: list.c:943
#define linitial(l)
Definition: pg_list.h:178

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBufferCleanup(), CopyMultiInsertBufferFlush(), lappend(), lfirst, linitial, list_delete_first(), list_length(), MAX_PARTITION_BUFFERS, CopyMultiInsertInfo::multiInsertBuffers, and CopyMultiInsertBuffer::resultRelInfo.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoInit()

static void CopyMultiInsertInfoInit ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
CopyFromState  cstate,
EState estate,
CommandId  mycid,
int  ti_options 
)
static

Definition at line 263 of file copyfrom.c.

266 {
267  miinfo->multiInsertBuffers = NIL;
268  miinfo->bufferedTuples = 0;
269  miinfo->bufferedBytes = 0;
270  miinfo->cstate = cstate;
271  miinfo->estate = estate;
272  miinfo->mycid = mycid;
273  miinfo->ti_options = ti_options;
274 
275  /*
276  * Only setup the buffer when not dealing with a partitioned table.
277  * Buffers for partitioned tables will just be setup when we need to send
278  * tuples their way for the first time.
279  */
280  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
281  CopyMultiInsertInfoSetupBuffer(miinfo, rri);
282 }

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfo::cstate, CopyMultiInsertInfo::estate, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertInfo::mycid, NIL, RelationData::rd_rel, ResultRelInfo::ri_RelationDesc, and CopyMultiInsertInfo::ti_options.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsEmpty()

static bool CopyMultiInsertInfoIsEmpty ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 300 of file copyfrom.c.

301 {
302  return miinfo->bufferedTuples == 0;
303 }

References CopyMultiInsertInfo::bufferedTuples.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsFull()

static bool CopyMultiInsertInfoIsFull ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 288 of file copyfrom.c.

289 {
290  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
292  return true;
293  return false;
294 }
#define MAX_BUFFERED_BYTES
Definition: copyfrom.c:72

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, MAX_BUFFERED_BYTES, and MAX_BUFFERED_TUPLES.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoNextFreeSlot()

static TupleTableSlot* CopyMultiInsertInfoNextFreeSlot ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 591 of file copyfrom.c.

593 {
595  int nused = buffer->nused;
596 
597  Assert(buffer != NULL);
598  Assert(nused < MAX_BUFFERED_TUPLES);
599 
600  if (buffer->slots[nused] == NULL)
601  buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
602  return buffer->slots[nused];
603 }

References Assert(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, and table_slot_create().

Referenced by CopyFrom().

◆ CopyMultiInsertInfoSetupBuffer()

static void CopyMultiInsertInfoSetupBuffer ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 243 of file copyfrom.c.

245 {
246  CopyMultiInsertBuffer *buffer;
247 
248  buffer = CopyMultiInsertBufferInit(rri);
249 
250  /* Setup back-link so we can easily find this buffer again */
251  rri->ri_CopyMultiInsertBuffer = buffer;
252  /* Record that we're tracking this buffer */
253  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
254 }
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
Definition: copyfrom.c:226

References CopyMultiInsertBufferInit(), lappend(), CopyMultiInsertInfo::multiInsertBuffers, and ResultRelInfo::ri_CopyMultiInsertBuffer.

Referenced by CopyFrom(), and CopyMultiInsertInfoInit().

◆ CopyMultiInsertInfoStore()

static void CopyMultiInsertInfoStore ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
TupleTableSlot slot,
int  tuplen,
uint64  lineno 
)
inlinestatic

Definition at line 610 of file copyfrom.c.

612 {
614 
615  Assert(buffer != NULL);
616  Assert(slot == buffer->slots[buffer->nused]);
617 
618  /* Store the line number so we can properly report any errors later */
619  buffer->linenos[buffer->nused] = lineno;
620 
621  /* Record this slot as being used */
622  buffer->nused++;
623 
624  /* Update how many tuples are stored and their size */
625  miinfo->bufferedTuples++;
626  miinfo->bufferedBytes += tuplen;
627 }

References Assert(), CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBuffer::linenos, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, and CopyMultiInsertBuffer::slots.

Referenced by CopyFrom().

◆ EndCopyFrom()

void EndCopyFrom ( CopyFromState  cstate)

Definition at line 1790 of file copyfrom.c.

1791 {
1792  /* No COPY FROM related resources except memory. */
1793  if (cstate->is_program)
1794  {
1795  ClosePipeFromProgram(cstate);
1796  }
1797  else
1798  {
1799  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1800  ereport(ERROR,
1802  errmsg("could not close file \"%s\": %m",
1803  cstate->filename)));
1804  }
1805 
1807 
1809  pfree(cstate);
1810 }
void pgstat_progress_end_command(void)
static void ClosePipeFromProgram(CopyFromState cstate)
Definition: copyfrom.c:1816
int FreeFile(FILE *file)
Definition: fd.c:2781
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403

References ClosePipeFromProgram(), CopyFromStateData::copy_file, CopyFromStateData::copycontext, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyFromStateData::filename, FreeFile(), CopyFromStateData::is_program, MemoryContextDelete(), pfree(), and pgstat_progress_end_command().

Referenced by DoCopy(), file_acquire_sample_rows(), fileEndForeignScan(), and fileReScanForeignScan().

◆ limit_printout_length()

static char * limit_printout_length ( const char *  str)
static

Definition at line 196 of file copyfrom.c.

197 {
198 #define MAX_COPY_DATA_DISPLAY 100
199 
200  int slen = strlen(str);
201  int len;
202  char *res;
203 
204  /* Fast path if definitely okay */
205  if (slen <= MAX_COPY_DATA_DISPLAY)
206  return pstrdup(str);
207 
208  /* Apply encoding-dependent truncation */
210 
211  /*
212  * Truncate, and add "..." to show we truncated the input.
213  */
214  res = (char *) palloc(len + 4);
215  memcpy(res, str, len);
216  strcpy(res + len, "...");
217 
218  return res;
219 }
#define MAX_COPY_DATA_DISPLAY
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:1084
const void size_t len

References len, MAX_COPY_DATA_DISPLAY, palloc(), pg_mbcliplen(), pstrdup(), res, and generate_unaccent_rules::str.

Referenced by CopyFromErrorCallback().