PostgreSQL Source Code  git master
copyfrom.c File Reference
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "executor/tuptable.h"
#include "foreign/fdwapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
Include dependency graph for copyfrom.c:

Go to the source code of this file.

Data Structures

struct  CopyMultiInsertBuffer
 
struct  CopyMultiInsertInfo
 

Macros

#define MAX_BUFFERED_TUPLES   1000
 
#define MAX_BUFFERED_BYTES   65535
 
#define MAX_PARTITION_BUFFERS   32
 
#define MAX_COPY_DATA_DISPLAY   100
 

Typedefs

typedef struct CopyMultiInsertBuffer CopyMultiInsertBuffer
 
typedef struct CopyMultiInsertInfo CopyMultiInsertInfo
 

Functions

static void ClosePipeFromProgram (CopyFromState cstate)
 
void CopyFromErrorCallback (void *arg)
 
char * CopyLimitPrintoutLength (const char *str)
 
static CopyMultiInsertBufferCopyMultiInsertBufferInit (ResultRelInfo *rri)
 
static void CopyMultiInsertInfoSetupBuffer (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoInit (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
 
static bool CopyMultiInsertInfoIsFull (CopyMultiInsertInfo *miinfo)
 
static bool CopyMultiInsertInfoIsEmpty (CopyMultiInsertInfo *miinfo)
 
static void CopyMultiInsertBufferFlush (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
 
static void CopyMultiInsertBufferCleanup (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
 
static void CopyMultiInsertInfoFlush (CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
 
static void CopyMultiInsertInfoCleanup (CopyMultiInsertInfo *miinfo)
 
static TupleTableSlotCopyMultiInsertInfoNextFreeSlot (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoStore (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
 
uint64 CopyFrom (CopyFromState cstate)
 
CopyFromState BeginCopyFrom (ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
 
void EndCopyFrom (CopyFromState cstate)
 

Macro Definition Documentation

◆ MAX_BUFFERED_BYTES

#define MAX_BUFFERED_BYTES   65535

Definition at line 69 of file copyfrom.c.

◆ MAX_BUFFERED_TUPLES

#define MAX_BUFFERED_TUPLES   1000

Definition at line 63 of file copyfrom.c.

◆ MAX_COPY_DATA_DISPLAY

#define MAX_COPY_DATA_DISPLAY   100

◆ MAX_PARTITION_BUFFERS

#define MAX_PARTITION_BUFFERS   32

Definition at line 75 of file copyfrom.c.

Typedef Documentation

◆ CopyMultiInsertBuffer

◆ CopyMultiInsertInfo

Function Documentation

◆ BeginCopyFrom()

CopyFromState BeginCopyFrom ( ParseState pstate,
Relation  rel,
Node whereClause,
const char *  filename,
bool  is_program,
copy_data_source_cb  data_source_cb,
List attnamelist,
List options 
)

Definition at line 1383 of file copyfrom.c.

1391 {
1392  CopyFromState cstate;
1393  bool pipe = (filename == NULL);
1394  TupleDesc tupDesc;
1395  AttrNumber num_phys_attrs,
1396  num_defaults;
1397  FmgrInfo *in_functions;
1398  Oid *typioparams;
1399  Oid in_func_oid;
1400  int *defmap;
1401  ExprState **defexprs;
1402  MemoryContext oldcontext;
1403  bool volatile_defexprs;
1404  const int progress_cols[] = {
1408  };
1409  int64 progress_vals[] = {
1411  0,
1412  0
1413  };
1414 
1415  /* Allocate workspace and zero all fields */
1416  cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1417 
1418  /*
1419  * We allocate everything used by a cstate in a new memory context. This
1420  * avoids memory leaks during repeated use of COPY in a query.
1421  */
1423  "COPY",
1425 
1426  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1427 
1428  /* Extract options from the statement node tree */
1429  ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1430 
1431  /* Process the target relation */
1432  cstate->rel = rel;
1433 
1434  tupDesc = RelationGetDescr(cstate->rel);
1435 
1436  /* process common options or initialization */
1437 
1438  /* Generate or convert list of attributes to process */
1439  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1440 
1441  num_phys_attrs = tupDesc->natts;
1442 
1443  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1444  cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1445  if (cstate->opts.force_notnull_all)
1446  MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1447  else if (cstate->opts.force_notnull)
1448  {
1449  List *attnums;
1450  ListCell *cur;
1451 
1452  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1453 
1454  foreach(cur, attnums)
1455  {
1456  int attnum = lfirst_int(cur);
1457  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1458 
1459  if (!list_member_int(cstate->attnumlist, attnum))
1460  ereport(ERROR,
1461  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1462  /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1463  errmsg("%s column \"%s\" not referenced by COPY",
1464  "FORCE_NOT_NULL", NameStr(attr->attname))));
1465  cstate->opts.force_notnull_flags[attnum - 1] = true;
1466  }
1467  }
1468 
1469  /* Set up soft error handler for ON_ERROR */
1470  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1471  {
1472  cstate->escontext = makeNode(ErrorSaveContext);
1473  cstate->escontext->type = T_ErrorSaveContext;
1474  cstate->escontext->error_occurred = false;
1475 
1476  /*
1477  * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1478  * options later
1479  */
1480  if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1481  cstate->escontext->details_wanted = false;
1482  }
1483  else
1484  cstate->escontext = NULL;
1485 
1486  /* Convert FORCE_NULL name list to per-column flags, check validity */
1487  cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1488  if (cstate->opts.force_null_all)
1489  MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1490  else if (cstate->opts.force_null)
1491  {
1492  List *attnums;
1493  ListCell *cur;
1494 
1495  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1496 
1497  foreach(cur, attnums)
1498  {
1499  int attnum = lfirst_int(cur);
1500  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1501 
1502  if (!list_member_int(cstate->attnumlist, attnum))
1503  ereport(ERROR,
1504  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1505  /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1506  errmsg("%s column \"%s\" not referenced by COPY",
1507  "FORCE_NULL", NameStr(attr->attname))));
1508  cstate->opts.force_null_flags[attnum - 1] = true;
1509  }
1510  }
1511 
1512  /* Convert convert_selectively name list to per-column flags */
1513  if (cstate->opts.convert_selectively)
1514  {
1515  List *attnums;
1516  ListCell *cur;
1517 
1518  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1519 
1520  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1521 
1522  foreach(cur, attnums)
1523  {
1524  int attnum = lfirst_int(cur);
1525  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1526 
1527  if (!list_member_int(cstate->attnumlist, attnum))
1528  ereport(ERROR,
1529  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1530  errmsg_internal("selected column \"%s\" not referenced by COPY",
1531  NameStr(attr->attname))));
1532  cstate->convert_select_flags[attnum - 1] = true;
1533  }
1534  }
1535 
1536  /* Use client encoding when ENCODING option is not specified. */
1537  if (cstate->opts.file_encoding < 0)
1539  else
1540  cstate->file_encoding = cstate->opts.file_encoding;
1541 
1542  /*
1543  * Look up encoding conversion function.
1544  */
1545  if (cstate->file_encoding == GetDatabaseEncoding() ||
1546  cstate->file_encoding == PG_SQL_ASCII ||
1548  {
1549  cstate->need_transcoding = false;
1550  }
1551  else
1552  {
1553  cstate->need_transcoding = true;
1556  if (!OidIsValid(cstate->conversion_proc))
1557  ereport(ERROR,
1558  (errcode(ERRCODE_UNDEFINED_FUNCTION),
1559  errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1562  }
1563 
1564  cstate->copy_src = COPY_FILE; /* default */
1565 
1566  cstate->whereClause = whereClause;
1567 
1568  /* Initialize state variables */
1569  cstate->eol_type = EOL_UNKNOWN;
1570  cstate->cur_relname = RelationGetRelationName(cstate->rel);
1571  cstate->cur_lineno = 0;
1572  cstate->cur_attname = NULL;
1573  cstate->cur_attval = NULL;
1574  cstate->relname_only = false;
1575 
1576  /*
1577  * Allocate buffers for the input pipeline.
1578  *
1579  * attribute_buf and raw_buf are used in both text and binary modes, but
1580  * input_buf and line_buf only in text mode.
1581  */
1582  cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1583  cstate->raw_buf_index = cstate->raw_buf_len = 0;
1584  cstate->raw_reached_eof = false;
1585 
1586  if (!cstate->opts.binary)
1587  {
1588  /*
1589  * If encoding conversion is needed, we need another buffer to hold
1590  * the converted input data. Otherwise, we can just point input_buf
1591  * to the same buffer as raw_buf.
1592  */
1593  if (cstate->need_transcoding)
1594  {
1595  cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1596  cstate->input_buf_index = cstate->input_buf_len = 0;
1597  }
1598  else
1599  cstate->input_buf = cstate->raw_buf;
1600  cstate->input_reached_eof = false;
1601 
1602  initStringInfo(&cstate->line_buf);
1603  }
1604 
1605  initStringInfo(&cstate->attribute_buf);
1606 
1607  /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1608  if (pstate)
1609  {
1610  cstate->range_table = pstate->p_rtable;
1611  cstate->rteperminfos = pstate->p_rteperminfos;
1612  }
1613 
1614  num_defaults = 0;
1615  volatile_defexprs = false;
1616 
1617  /*
1618  * Pick up the required catalog information for each attribute in the
1619  * relation, including the input function, the element type (to pass to
1620  * the input function), and info about defaults and constraints. (Which
1621  * input function we use depends on text/binary format choice.)
1622  */
1623  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1624  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1625  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1626  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1627 
1628  for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1629  {
1630  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1631 
1632  /* We don't need info for dropped attributes */
1633  if (att->attisdropped)
1634  continue;
1635 
1636  /* Fetch the input function and typioparam info */
1637  if (cstate->opts.binary)
1638  getTypeBinaryInputInfo(att->atttypid,
1639  &in_func_oid, &typioparams[attnum - 1]);
1640  else
1641  getTypeInputInfo(att->atttypid,
1642  &in_func_oid, &typioparams[attnum - 1]);
1643  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1644 
1645  /* Get default info if available */
1646  defexprs[attnum - 1] = NULL;
1647 
1648  /*
1649  * We only need the default values for columns that do not appear in
1650  * the column list, unless the DEFAULT option was given. We never need
1651  * default values for generated columns.
1652  */
1653  if ((cstate->opts.default_print != NULL ||
1654  !list_member_int(cstate->attnumlist, attnum)) &&
1655  !att->attgenerated)
1656  {
1657  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1658  attnum);
1659 
1660  if (defexpr != NULL)
1661  {
1662  /* Run the expression through planner */
1663  defexpr = expression_planner(defexpr);
1664 
1665  /* Initialize executable expression in copycontext */
1666  defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1667 
1668  /* if NOT copied from input */
1669  /* use default value if one exists */
1670  if (!list_member_int(cstate->attnumlist, attnum))
1671  {
1672  defmap[num_defaults] = attnum - 1;
1673  num_defaults++;
1674  }
1675 
1676  /*
1677  * If a default expression looks at the table being loaded,
1678  * then it could give the wrong answer when using
1679  * multi-insert. Since database access can be dynamic this is
1680  * hard to test for exactly, so we use the much wider test of
1681  * whether the default expression is volatile. We allow for
1682  * the special case of when the default expression is the
1683  * nextval() of a sequence which in this specific case is
1684  * known to be safe for use with the multi-insert
1685  * optimization. Hence we use this special case function
1686  * checker rather than the standard check for
1687  * contain_volatile_functions(). Note also that we already
1688  * ran the expression through expression_planner().
1689  */
1690  if (!volatile_defexprs)
1691  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1692  }
1693  }
1694  }
1695 
1696  cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1697 
1698  /* initialize progress */
1700  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1701  cstate->bytes_processed = 0;
1702 
1703  /* We keep those variables in cstate. */
1704  cstate->in_functions = in_functions;
1705  cstate->typioparams = typioparams;
1706  cstate->defmap = defmap;
1707  cstate->defexprs = defexprs;
1708  cstate->volatile_defexprs = volatile_defexprs;
1709  cstate->num_defaults = num_defaults;
1710  cstate->is_program = is_program;
1711 
1712  if (data_source_cb)
1713  {
1714  progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1715  cstate->copy_src = COPY_CALLBACK;
1716  cstate->data_source_cb = data_source_cb;
1717  }
1718  else if (pipe)
1719  {
1720  progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1721  Assert(!is_program); /* the grammar does not allow this */
1723  ReceiveCopyBegin(cstate);
1724  else
1725  cstate->copy_file = stdin;
1726  }
1727  else
1728  {
1729  cstate->filename = pstrdup(filename);
1730 
1731  if (cstate->is_program)
1732  {
1733  progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1734  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1735  if (cstate->copy_file == NULL)
1736  ereport(ERROR,
1738  errmsg("could not execute command \"%s\": %m",
1739  cstate->filename)));
1740  }
1741  else
1742  {
1743  struct stat st;
1744 
1745  progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1746  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1747  if (cstate->copy_file == NULL)
1748  {
1749  /* copy errno because ereport subfunctions might change it */
1750  int save_errno = errno;
1751 
1752  ereport(ERROR,
1754  errmsg("could not open file \"%s\" for reading: %m",
1755  cstate->filename),
1756  (save_errno == ENOENT || save_errno == EACCES) ?
1757  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1758  "You may want a client-side facility such as psql's \\copy.") : 0));
1759  }
1760 
1761  if (fstat(fileno(cstate->copy_file), &st))
1762  ereport(ERROR,
1764  errmsg("could not stat file \"%s\": %m",
1765  cstate->filename)));
1766 
1767  if (S_ISDIR(st.st_mode))
1768  ereport(ERROR,
1769  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1770  errmsg("\"%s\" is a directory", cstate->filename)));
1771 
1772  progress_vals[2] = st.st_size;
1773  }
1774  }
1775 
1776  pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1777 
1778  if (cstate->opts.binary)
1779  {
1780  /* Read and verify binary header */
1781  ReceiveCopyBinaryHeader(cstate);
1782  }
1783 
1784  /* create workspace for CopyReadAttributes results */
1785  if (!cstate->opts.binary)
1786  {
1787  AttrNumber attr_count = list_length(cstate->attnumlist);
1788 
1789  cstate->max_fields = attr_count;
1790  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1791  }
1792 
1793  MemoryContextSwitchTo(oldcontext);
1794 
1795  return cstate;
1796 }
int16 AttrNumber
Definition: attnum.h:21
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:927
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:482
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
@ PROGRESS_COMMAND_COPY
#define NameStr(name)
Definition: c.h:737
#define PG_BINARY_R
Definition: c.h:1266
#define Assert(condition)
Definition: c.h:849
#define MemSet(start, val, len)
Definition: c.h:1011
#define OidIsValid(objectId)
Definition: c.h:766
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:673
#define INPUT_BUF_SIZE
@ EOL_UNKNOWN
#define RAW_BUF_SIZE
void ReceiveCopyBinaryHeader(CopyFromState cstate)
void ReceiveCopyBegin(CopyFromState cstate)
@ COPY_FILE
Definition: copyto.c:45
@ COPY_CALLBACK
Definition: copyto.c:47
@ DestRemote
Definition: dest.h:89
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode_for_file_access(void)
Definition: elog.c:876
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:138
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2606
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2709
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
@ COPY_ON_ERROR_IGNORE
Definition: copy.h:40
@ COPY_ON_ERROR_STOP
Definition: copy.h:39
bool list_member_int(const List *list, int datum)
Definition: list.c:702
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2874
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2940
int GetDatabaseEncoding(void)
Definition: mbutils.c:1261
int pg_get_client_encoding(void)
Definition: mbutils.c:336
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding)
Definition: namespace.c:4080
#define makeNode(_type_)
Definition: nodes.h:155
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static char * filename
Definition: pg_dumpall.c:119
static int list_length(const List *l)
Definition: pg_list.h:152
#define lfirst_int(lc)
Definition: pg_list.h:173
@ PG_SQL_ASCII
Definition: pg_wchar.h:226
#define pg_encoding_to_char
Definition: pg_wchar.h:630
Expr * expression_planner(Expr *expr)
Definition: planner.c:6688
CommandDest whereToSendOutput
Definition: postgres.c:91
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define PROGRESS_COPY_COMMAND
Definition: progress.h:144
#define PROGRESS_COPY_TYPE_FILE
Definition: progress.h:153
#define PROGRESS_COPY_COMMAND_FROM
Definition: progress.h:149
#define PROGRESS_COPY_TYPE
Definition: progress.h:145
#define PROGRESS_COPY_TYPE_PROGRAM
Definition: progress.h:154
#define PROGRESS_COPY_BYTES_TOTAL
Definition: progress.h:141
#define PROGRESS_COPY_TYPE_CALLBACK
Definition: progress.h:156
#define PROGRESS_COPY_TYPE_PIPE
Definition: progress.h:155
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
Node * build_column_default(Relation rel, int attrno)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
bool force_notnull_all
Definition: copy.h:80
bool binary
Definition: copy.h:64
bool convert_selectively
Definition: copy.h:85
CopyOnErrorChoice on_error
Definition: copy.h:86
List * force_null
Definition: copy.h:82
List * convert_select
Definition: copy.h:89
bool force_null_all
Definition: copy.h:83
bool * force_notnull_flags
Definition: copy.h:81
int file_encoding
Definition: copy.h:62
bool * force_null_flags
Definition: copy.h:84
char * default_print
Definition: copy.h:71
List * force_notnull
Definition: copy.h:79
copy_data_source_cb data_source_cb
StringInfoData line_buf
CopyFormatOptions opts
StringInfoData attribute_buf
MemoryContext copycontext
const char * cur_attval
const char * cur_attname
const char * cur_relname
ErrorSaveContext * escontext
bool details_wanted
Definition: miscnodes.h:47
bool error_occurred
Definition: miscnodes.h:46
NodeTag type
Definition: miscnodes.h:45
Definition: fmgr.h:57
Definition: pg_list.h:54
Definition: nodes.h:129
List * p_rteperminfos
Definition: parse_node.h:197
List * p_rtable
Definition: parse_node.h:196
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define S_ISDIR(m)
Definition: win32_port.h:325
#define fstat
Definition: win32_port.h:283

References AllocateFile(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, attnum, CopyFromStateData::attnumlist, CopyFromStateData::attribute_buf, CopyFormatOptions::binary, build_column_default(), CopyFromStateData::bytes_processed, contain_volatile_functions_not_nextval(), CopyFromStateData::conversion_proc, CopyFormatOptions::convert_select, CopyFromStateData::convert_select_flags, CopyFormatOptions::convert_selectively, COPY_CALLBACK, COPY_FILE, CopyFromStateData::copy_file, COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromStateData::copy_src, CopyFromStateData::copycontext, CopyGetAttnums(), cur, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, CurrentMemoryContext, CopyFromStateData::data_source_cb, CopyFormatOptions::default_print, CopyFromStateData::defaults, CopyFromStateData::defexprs, CopyFromStateData::defmap, DestRemote, ErrorSaveContext::details_wanted, CopyFromStateData::eol_type, EOL_UNKNOWN, ereport, errcode(), errcode_for_file_access(), errhint(), errmsg(), errmsg_internal(), ERROR, ErrorSaveContext::error_occurred, CopyFromStateData::escontext, ExecInitExpr(), expression_planner(), CopyFormatOptions::file_encoding, CopyFromStateData::file_encoding, filename, CopyFromStateData::filename, FindDefaultConversionProc(), fmgr_info(), CopyFormatOptions::force_notnull, CopyFormatOptions::force_notnull_all, CopyFormatOptions::force_notnull_flags, CopyFormatOptions::force_null, CopyFormatOptions::force_null_all, CopyFormatOptions::force_null_flags, fstat, GetDatabaseEncoding(), getTypeBinaryInputInfo(), getTypeInputInfo(), CopyFromStateData::in_functions, initStringInfo(), CopyFromStateData::input_buf, CopyFromStateData::input_buf_index, CopyFromStateData::input_buf_len, INPUT_BUF_SIZE, CopyFromStateData::input_reached_eof, InvalidOid, CopyFromStateData::is_program, lfirst_int, CopyFromStateData::line_buf, list_length(), list_member_int(), makeNode, CopyFromStateData::max_fields, MemoryContextSwitchTo(), MemSet, NameStr, TupleDescData::natts, CopyFromStateData::need_transcoding, CopyFromStateData::num_defaults, OidIsValid, CopyFormatOptions::on_error, OpenPipeStream(), CopyFromStateData::opts, ParseState::p_rtable, ParseState::p_rteperminfos, palloc(), palloc0(), PG_BINARY_R, pg_encoding_to_char, pg_get_client_encoding(), PG_SQL_ASCII, pgstat_progress_start_command(), pgstat_progress_update_multi_param(), ProcessCopyOptions(), PROGRESS_COMMAND_COPY, PROGRESS_COPY_BYTES_TOTAL, PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_FROM, PROGRESS_COPY_TYPE, PROGRESS_COPY_TYPE_CALLBACK, PROGRESS_COPY_TYPE_FILE, PROGRESS_COPY_TYPE_PIPE, PROGRESS_COPY_TYPE_PROGRAM, pstrdup(), CopyFromStateData::range_table, CopyFromStateData::raw_buf, CopyFromStateData::raw_buf_index, CopyFromStateData::raw_buf_len, RAW_BUF_SIZE, CopyFromStateData::raw_fields, CopyFromStateData::raw_reached_eof, ReceiveCopyBegin(), ReceiveCopyBinaryHeader(), CopyFromStateData::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, CopyFromStateData::relname_only, CopyFromStateData::rteperminfos, S_ISDIR, stat::st_mode, stat::st_size, TupleDescAttr, ErrorSaveContext::type, CopyFromStateData::typioparams, CopyFromStateData::volatile_defexprs, CopyFromStateData::whereClause, and whereToSendOutput.

Referenced by copy_table(), DoCopy(), file_acquire_sample_rows(), fileBeginForeignScan(), and fileReScanForeignScan().

◆ ClosePipeFromProgram()

static void ClosePipeFromProgram ( CopyFromState  cstate)
static

Definition at line 1828 of file copyfrom.c.

1829 {
1830  int pclose_rc;
1831 
1832  Assert(cstate->is_program);
1833 
1834  pclose_rc = ClosePipeStream(cstate->copy_file);
1835  if (pclose_rc == -1)
1836  ereport(ERROR,
1838  errmsg("could not close pipe to external command: %m")));
1839  else if (pclose_rc != 0)
1840  {
1841  /*
1842  * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1843  * expectable for the called program to fail with SIGPIPE, and we
1844  * should not report that as an error. Otherwise, SIGPIPE indicates a
1845  * problem.
1846  */
1847  if (!cstate->raw_reached_eof &&
1848  wait_result_is_signal(pclose_rc, SIGPIPE))
1849  return;
1850 
1851  ereport(ERROR,
1852  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1853  errmsg("program \"%s\" failed",
1854  cstate->filename),
1855  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1856  }
1857 }
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int ClosePipeStream(FILE *file)
Definition: fd.c:3014
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
bool wait_result_is_signal(int exit_status, int signum)
Definition: wait_error.c:102
#define SIGPIPE
Definition: win32_port.h:173

References Assert, ClosePipeStream(), CopyFromStateData::copy_file, ereport, errcode(), errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, CopyFromStateData::filename, CopyFromStateData::is_program, CopyFromStateData::raw_reached_eof, SIGPIPE, wait_result_is_signal(), and wait_result_to_str().

Referenced by EndCopyFrom().

◆ CopyFrom()

uint64 CopyFrom ( CopyFromState  cstate)

Definition at line 640 of file copyfrom.c.

641 {
642  ResultRelInfo *resultRelInfo;
643  ResultRelInfo *target_resultRelInfo;
644  ResultRelInfo *prevResultRelInfo = NULL;
645  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
646  ModifyTableState *mtstate;
647  ExprContext *econtext;
648  TupleTableSlot *singleslot = NULL;
649  MemoryContext oldcontext = CurrentMemoryContext;
650 
651  PartitionTupleRouting *proute = NULL;
652  ErrorContextCallback errcallback;
653  CommandId mycid = GetCurrentCommandId(true);
654  int ti_options = 0; /* start with default options for insert */
655  BulkInsertState bistate = NULL;
656  CopyInsertMethod insertMethod;
657  CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
658  int64 processed = 0;
659  int64 excluded = 0;
660  bool has_before_insert_row_trig;
661  bool has_instead_insert_row_trig;
662  bool leafpart_use_multi_insert = false;
663 
664  Assert(cstate->rel);
665  Assert(list_length(cstate->range_table) == 1);
666 
667  if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
668  Assert(cstate->escontext);
669 
670  /*
671  * The target must be a plain, foreign, or partitioned relation, or have
672  * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
673  * allowed on views, so we only hint about them in the view case.)
674  */
675  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
676  cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
677  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
678  !(cstate->rel->trigdesc &&
680  {
681  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
682  ereport(ERROR,
683  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684  errmsg("cannot copy to view \"%s\"",
685  RelationGetRelationName(cstate->rel)),
686  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
687  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
688  ereport(ERROR,
689  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
690  errmsg("cannot copy to materialized view \"%s\"",
691  RelationGetRelationName(cstate->rel))));
692  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
693  ereport(ERROR,
694  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
695  errmsg("cannot copy to sequence \"%s\"",
696  RelationGetRelationName(cstate->rel))));
697  else
698  ereport(ERROR,
699  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
700  errmsg("cannot copy to non-table relation \"%s\"",
701  RelationGetRelationName(cstate->rel))));
702  }
703 
704  /*
705  * If the target file is new-in-transaction, we assume that checking FSM
706  * for free space is a waste of time. This could possibly be wrong, but
707  * it's unlikely.
708  */
709  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
712  ti_options |= TABLE_INSERT_SKIP_FSM;
713 
714  /*
715  * Optimize if new relation storage was created in this subxact or one of
716  * its committed children and we won't see those rows later as part of an
717  * earlier scan or command. The subxact test ensures that if this subxact
718  * aborts then the frozen rows won't be visible after xact cleanup. Note
719  * that the stronger test of exactly which subtransaction created it is
720  * crucial for correctness of this optimization. The test for an earlier
721  * scan or command tolerates false negatives. FREEZE causes other sessions
722  * to see rows they would not see under MVCC, and a false negative merely
723  * spreads that anomaly to the current session.
724  */
725  if (cstate->opts.freeze)
726  {
727  /*
728  * We currently disallow COPY FREEZE on partitioned tables. The
729  * reason for this is that we've simply not yet opened the partitions
730  * to determine if the optimization can be applied to them. We could
731  * go and open them all here, but doing so may be quite a costly
732  * overhead for small copies. In any case, we may just end up routing
733  * tuples to a small number of partitions. It seems better just to
734  * raise an ERROR for partitioned tables.
735  */
736  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
737  {
738  ereport(ERROR,
739  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
740  errmsg("cannot perform COPY FREEZE on a partitioned table")));
741  }
742 
743  /*
744  * Tolerate one registration for the benefit of FirstXactSnapshot.
745  * Scan-bearing queries generally create at least two registrations,
746  * though relying on that is fragile, as is ignoring ActiveSnapshot.
747  * Clear CatalogSnapshot to avoid counting its registration. We'll
748  * still detect ongoing catalog scans, each of which separately
749  * registers the snapshot it uses.
750  */
753  ereport(ERROR,
754  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
755  errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
756 
757  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
759  ereport(ERROR,
760  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
761  errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
762 
763  ti_options |= TABLE_INSERT_FROZEN;
764  }
765 
766  /*
767  * We need a ResultRelInfo so we can use the regular executor's
768  * index-entry-making machinery. (There used to be a huge amount of code
769  * here that basically duplicated execUtils.c ...)
770  */
771  ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
772  resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
773  ExecInitResultRelation(estate, resultRelInfo, 1);
774 
775  /* Verify the named relation is a valid target for INSERT */
776  CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
777 
778  ExecOpenIndices(resultRelInfo, false);
779 
780  /*
781  * Set up a ModifyTableState so we can let FDW(s) init themselves for
782  * foreign-table result relation(s).
783  */
784  mtstate = makeNode(ModifyTableState);
785  mtstate->ps.plan = NULL;
786  mtstate->ps.state = estate;
787  mtstate->operation = CMD_INSERT;
788  mtstate->mt_nrels = 1;
789  mtstate->resultRelInfo = resultRelInfo;
790  mtstate->rootResultRelInfo = resultRelInfo;
791 
792  if (resultRelInfo->ri_FdwRoutine != NULL &&
793  resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
794  resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
795  resultRelInfo);
796 
797  /*
798  * Also, if the named relation is a foreign table, determine if the FDW
799  * supports batch insert and determine the batch size (a FDW may support
800  * batching, but it may be disabled for the server/table).
801  *
802  * If the FDW does not support batching, we set the batch size to 1.
803  */
804  if (resultRelInfo->ri_FdwRoutine != NULL &&
805  resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
806  resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
807  resultRelInfo->ri_BatchSize =
808  resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
809  else
810  resultRelInfo->ri_BatchSize = 1;
811 
812  Assert(resultRelInfo->ri_BatchSize >= 1);
813 
814  /* Prepare to catch AFTER triggers. */
816 
817  /*
818  * If there are any triggers with transition tables on the named relation,
819  * we need to be prepared to capture transition tuples.
820  *
821  * Because partition tuple routing would like to know about whether
822  * transition capture is active, we also set it in mtstate, which is
823  * passed to ExecFindPartition() below.
824  */
825  cstate->transition_capture = mtstate->mt_transition_capture =
827  RelationGetRelid(cstate->rel),
828  CMD_INSERT);
829 
830  /*
831  * If the named relation is a partitioned table, initialize state for
832  * CopyFrom tuple routing.
833  */
834  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
835  proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
836 
837  if (cstate->whereClause)
838  cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
839  &mtstate->ps);
840 
841  /*
842  * It's generally more efficient to prepare a bunch of tuples for
843  * insertion, and insert them in one
844  * table_multi_insert()/ExecForeignBatchInsert() call, than call
845  * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
846  * However, there are a number of reasons why we might not be able to do
847  * this. These are explained below.
848  */
849  if (resultRelInfo->ri_TrigDesc != NULL &&
850  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
851  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
852  {
853  /*
854  * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
855  * triggers on the table. Such triggers might query the table we're
856  * inserting into and act differently if the tuples that have already
857  * been processed and prepared for insertion are not there.
858  */
859  insertMethod = CIM_SINGLE;
860  }
861  else if (resultRelInfo->ri_FdwRoutine != NULL &&
862  resultRelInfo->ri_BatchSize == 1)
863  {
864  /*
865  * Can't support multi-inserts to a foreign table if the FDW does not
866  * support batching, or it's disabled for the server or foreign table.
867  */
868  insertMethod = CIM_SINGLE;
869  }
870  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
871  resultRelInfo->ri_TrigDesc->trig_insert_new_table)
872  {
873  /*
874  * For partitioned tables we can't support multi-inserts when there
875  * are any statement level insert triggers. It might be possible to
876  * allow partitioned tables with such triggers in the future, but for
877  * now, CopyMultiInsertInfoFlush expects that any after row insert and
878  * statement level insert triggers are on the same relation.
879  */
880  insertMethod = CIM_SINGLE;
881  }
882  else if (cstate->volatile_defexprs)
883  {
884  /*
885  * Can't support multi-inserts if there are any volatile default
886  * expressions in the table. Similarly to the trigger case above,
887  * such expressions may query the table we're inserting into.
888  *
889  * Note: It does not matter if any partitions have any volatile
890  * default expressions as we use the defaults from the target of the
891  * COPY command.
892  */
893  insertMethod = CIM_SINGLE;
894  }
895  else if (contain_volatile_functions(cstate->whereClause))
896  {
897  /*
898  * Can't support multi-inserts if there are any volatile function
899  * expressions in WHERE clause. Similarly to the trigger case above,
900  * such expressions may query the table we're inserting into.
901  *
902  * Note: the whereClause was already preprocessed in DoCopy(), so it's
903  * okay to use contain_volatile_functions() directly.
904  */
905  insertMethod = CIM_SINGLE;
906  }
907  else
908  {
909  /*
910  * For partitioned tables, we may still be able to perform bulk
911  * inserts. However, the possibility of this depends on which types
912  * of triggers exist on the partition. We must disable bulk inserts
913  * if the partition is a foreign table that can't use batching or it
914  * has any before row insert or insert instead triggers (same as we
915  * checked above for the parent table). Since the partition's
916  * resultRelInfos are initialized only when we actually need to insert
917  * the first tuple into them, we must have the intermediate insert
918  * method of CIM_MULTI_CONDITIONAL to flag that we must later
919  * determine if we can use bulk-inserts for the partition being
920  * inserted into.
921  */
922  if (proute)
923  insertMethod = CIM_MULTI_CONDITIONAL;
924  else
925  insertMethod = CIM_MULTI;
926 
927  CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
928  estate, mycid, ti_options);
929  }
930 
931  /*
932  * If not using batch mode (which allocates slots as needed) set up a
933  * tuple slot too. When inserting into a partitioned table, we also need
934  * one, even if we might batch insert, to read the tuple in the root
935  * partition's form.
936  */
937  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
938  {
939  singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
940  &estate->es_tupleTable);
941  bistate = GetBulkInsertState();
942  }
943 
944  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
945  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
946 
947  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
948  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
949 
950  /*
951  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
952  * should do this for COPY, since it's not really an "INSERT" statement as
953  * such. However, executing these triggers maintains consistency with the
954  * EACH ROW triggers that we already fire on COPY.
955  */
956  ExecBSInsertTriggers(estate, resultRelInfo);
957 
958  econtext = GetPerTupleExprContext(estate);
959 
960  /* Set up callback to identify error line number */
961  errcallback.callback = CopyFromErrorCallback;
962  errcallback.arg = (void *) cstate;
963  errcallback.previous = error_context_stack;
964  error_context_stack = &errcallback;
965 
966  for (;;)
967  {
968  TupleTableSlot *myslot;
969  bool skip_tuple;
970 
972 
973  /*
974  * Reset the per-tuple exprcontext. We do this after every tuple, to
975  * clean-up after expression evaluations etc.
976  */
977  ResetPerTupleExprContext(estate);
978 
979  /* select slot to (initially) load row into */
980  if (insertMethod == CIM_SINGLE || proute)
981  {
982  myslot = singleslot;
983  Assert(myslot != NULL);
984  }
985  else
986  {
987  Assert(resultRelInfo == target_resultRelInfo);
988  Assert(insertMethod == CIM_MULTI);
989 
990  myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
991  resultRelInfo);
992  }
993 
994  /*
995  * Switch to per-tuple context before calling NextCopyFrom, which does
996  * evaluate default expressions etc. and requires per-tuple context.
997  */
999 
1000  ExecClearTuple(myslot);
1001 
1002  /* Directly store the values/nulls array in the slot */
1003  if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1004  break;
1005 
1006  if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1007  cstate->escontext->error_occurred)
1008  {
1009  /*
1010  * Soft error occurred, skip this tuple and just make
1011  * ErrorSaveContext ready for the next NextCopyFrom. Since we
1012  * don't set details_wanted and error_data is not to be filled,
1013  * just resetting error_occurred is enough.
1014  */
1015  cstate->escontext->error_occurred = false;
1016 
1017  /* Report that this tuple was skipped by the ON_ERROR clause */
1019  cstate->num_errors);
1020 
1021  if (cstate->opts.reject_limit > 0 && \
1022  cstate->num_errors > cstate->opts.reject_limit)
1023  ereport(ERROR,
1024  (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1025  errmsg("skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
1026  (long long) cstate->opts.reject_limit)));
1027 
1028  /* Repeat NextCopyFrom() until no soft error occurs */
1029  continue;
1030  }
1031 
1032  ExecStoreVirtualTuple(myslot);
1033 
1034  /*
1035  * Constraints and where clause might reference the tableoid column,
1036  * so (re-)initialize tts_tableOid before evaluating them.
1037  */
1038  myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1039 
1040  /* Triggers and stuff need to be invoked in query context. */
1041  MemoryContextSwitchTo(oldcontext);
1042 
1043  if (cstate->whereClause)
1044  {
1045  econtext->ecxt_scantuple = myslot;
1046  /* Skip items that don't match COPY's WHERE clause */
1047  if (!ExecQual(cstate->qualexpr, econtext))
1048  {
1049  /*
1050  * Report that this tuple was filtered out by the WHERE
1051  * clause.
1052  */
1054  ++excluded);
1055  continue;
1056  }
1057  }
1058 
1059  /* Determine the partition to insert the tuple into */
1060  if (proute)
1061  {
1062  TupleConversionMap *map;
1063 
1064  /*
1065  * Attempt to find a partition suitable for this tuple.
1066  * ExecFindPartition() will raise an error if none can be found or
1067  * if the found partition is not suitable for INSERTs.
1068  */
1069  resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1070  proute, myslot, estate);
1071 
1072  if (prevResultRelInfo != resultRelInfo)
1073  {
1074  /* Determine which triggers exist on this partition */
1075  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1076  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1077 
1078  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1079  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1080 
1081  /*
1082  * Disable multi-inserts when the partition has BEFORE/INSTEAD
1083  * OF triggers, or if the partition is a foreign table that
1084  * can't use batching.
1085  */
1086  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1087  !has_before_insert_row_trig &&
1088  !has_instead_insert_row_trig &&
1089  (resultRelInfo->ri_FdwRoutine == NULL ||
1090  resultRelInfo->ri_BatchSize > 1);
1091 
1092  /* Set the multi-insert buffer to use for this partition. */
1093  if (leafpart_use_multi_insert)
1094  {
1095  if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1096  CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1097  resultRelInfo);
1098  }
1099  else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1100  !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1101  {
1102  /*
1103  * Flush pending inserts if this partition can't use
1104  * batching, so rows are visible to triggers etc.
1105  */
1106  CopyMultiInsertInfoFlush(&multiInsertInfo,
1107  resultRelInfo,
1108  &processed);
1109  }
1110 
1111  if (bistate != NULL)
1112  ReleaseBulkInsertStatePin(bistate);
1113  prevResultRelInfo = resultRelInfo;
1114  }
1115 
1116  /*
1117  * If we're capturing transition tuples, we might need to convert
1118  * from the partition rowtype to root rowtype. But if there are no
1119  * BEFORE triggers on the partition that could change the tuple,
1120  * we can just remember the original unconverted tuple to avoid a
1121  * needless round trip conversion.
1122  */
1123  if (cstate->transition_capture != NULL)
1125  !has_before_insert_row_trig ? myslot : NULL;
1126 
1127  /*
1128  * We might need to convert from the root rowtype to the partition
1129  * rowtype.
1130  */
1131  map = ExecGetRootToChildMap(resultRelInfo, estate);
1132  if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1133  {
1134  /* non batch insert */
1135  if (map != NULL)
1136  {
1137  TupleTableSlot *new_slot;
1138 
1139  new_slot = resultRelInfo->ri_PartitionTupleSlot;
1140  myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1141  }
1142  }
1143  else
1144  {
1145  /*
1146  * Prepare to queue up tuple for later batch insert into
1147  * current partition.
1148  */
1149  TupleTableSlot *batchslot;
1150 
1151  /* no other path available for partitioned table */
1152  Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1153 
1154  batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1155  resultRelInfo);
1156 
1157  if (map != NULL)
1158  myslot = execute_attr_map_slot(map->attrMap, myslot,
1159  batchslot);
1160  else
1161  {
1162  /*
1163  * This looks more expensive than it is (Believe me, I
1164  * optimized it away. Twice.). The input is in virtual
1165  * form, and we'll materialize the slot below - for most
1166  * slot types the copy performs the work materialization
1167  * would later require anyway.
1168  */
1169  ExecCopySlot(batchslot, myslot);
1170  myslot = batchslot;
1171  }
1172  }
1173 
1174  /* ensure that triggers etc see the right relation */
1175  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1176  }
1177 
1178  skip_tuple = false;
1179 
1180  /* BEFORE ROW INSERT Triggers */
1181  if (has_before_insert_row_trig)
1182  {
1183  if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1184  skip_tuple = true; /* "do nothing" */
1185  }
1186 
1187  if (!skip_tuple)
1188  {
1189  /*
1190  * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1191  * tuple. Otherwise, proceed with inserting the tuple into the
1192  * table or foreign table.
1193  */
1194  if (has_instead_insert_row_trig)
1195  {
1196  ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1197  }
1198  else
1199  {
1200  /* Compute stored generated columns */
1201  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1203  ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1204  CMD_INSERT);
1205 
1206  /*
1207  * If the target is a plain table, check the constraints of
1208  * the tuple.
1209  */
1210  if (resultRelInfo->ri_FdwRoutine == NULL &&
1211  resultRelInfo->ri_RelationDesc->rd_att->constr)
1212  ExecConstraints(resultRelInfo, myslot, estate);
1213 
1214  /*
1215  * Also check the tuple against the partition constraint, if
1216  * there is one; except that if we got here via tuple-routing,
1217  * we don't need to if there's no BR trigger defined on the
1218  * partition.
1219  */
1220  if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1221  (proute == NULL || has_before_insert_row_trig))
1222  ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1223 
1224  /* Store the slot in the multi-insert buffer, when enabled. */
1225  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1226  {
1227  /*
1228  * The slot previously might point into the per-tuple
1229  * context. For batching it needs to be longer lived.
1230  */
1231  ExecMaterializeSlot(myslot);
1232 
1233  /* Add this tuple to the tuple buffer */
1234  CopyMultiInsertInfoStore(&multiInsertInfo,
1235  resultRelInfo, myslot,
1236  cstate->line_buf.len,
1237  cstate->cur_lineno);
1238 
1239  /*
1240  * If enough inserts have queued up, then flush all
1241  * buffers out to their tables.
1242  */
1243  if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1244  CopyMultiInsertInfoFlush(&multiInsertInfo,
1245  resultRelInfo,
1246  &processed);
1247 
1248  /*
1249  * We delay updating the row counter and progress of the
1250  * COPY command until after writing the tuples stored in
1251  * the buffer out to the table, as in single insert mode.
1252  * See CopyMultiInsertBufferFlush().
1253  */
1254  continue; /* next tuple please */
1255  }
1256  else
1257  {
1258  List *recheckIndexes = NIL;
1259 
1260  /* OK, store the tuple */
1261  if (resultRelInfo->ri_FdwRoutine != NULL)
1262  {
1263  myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1264  resultRelInfo,
1265  myslot,
1266  NULL);
1267 
1268  if (myslot == NULL) /* "do nothing" */
1269  continue; /* next tuple please */
1270 
1271  /*
1272  * AFTER ROW Triggers might reference the tableoid
1273  * column, so (re-)initialize tts_tableOid before
1274  * evaluating them.
1275  */
1276  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1277  }
1278  else
1279  {
1280  /* OK, store the tuple and create index entries for it */
1281  table_tuple_insert(resultRelInfo->ri_RelationDesc,
1282  myslot, mycid, ti_options, bistate);
1283 
1284  if (resultRelInfo->ri_NumIndices > 0)
1285  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1286  myslot,
1287  estate,
1288  false,
1289  false,
1290  NULL,
1291  NIL,
1292  false);
1293  }
1294 
1295  /* AFTER ROW INSERT Triggers */
1296  ExecARInsertTriggers(estate, resultRelInfo, myslot,
1297  recheckIndexes, cstate->transition_capture);
1298 
1299  list_free(recheckIndexes);
1300  }
1301  }
1302 
1303  /*
1304  * We count only tuples not suppressed by a BEFORE INSERT trigger
1305  * or FDW; this is the same definition used by nodeModifyTable.c
1306  * for counting tuples inserted by an INSERT command. Update
1307  * progress of the COPY command as well.
1308  */
1310  ++processed);
1311  }
1312  }
1313 
1314  /* Flush any remaining buffered tuples */
1315  if (insertMethod != CIM_SINGLE)
1316  {
1317  if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1318  CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1319  }
1320 
1321  /* Done, clean up */
1322  error_context_stack = errcallback.previous;
1323 
1324  if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1325  cstate->num_errors > 0 &&
1327  ereport(NOTICE,
1328  errmsg_plural("%llu row was skipped due to data type incompatibility",
1329  "%llu rows were skipped due to data type incompatibility",
1330  (unsigned long long) cstate->num_errors,
1331  (unsigned long long) cstate->num_errors));
1332 
1333  if (bistate != NULL)
1334  FreeBulkInsertState(bistate);
1335 
1336  MemoryContextSwitchTo(oldcontext);
1337 
1338  /* Execute AFTER STATEMENT insertion triggers */
1339  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1340 
1341  /* Handle queued AFTER triggers */
1342  AfterTriggerEndQuery(estate);
1343 
1344  ExecResetTupleTable(estate->es_tupleTable, false);
1345 
1346  /* Allow the FDW to shut down */
1347  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1348  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1349  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1350  target_resultRelInfo);
1351 
1352  /* Tear down the multi-insert buffer data */
1353  if (insertMethod != CIM_SINGLE)
1354  CopyMultiInsertInfoCleanup(&multiInsertInfo);
1355 
1356  /* Close all the partitioned tables, leaf partitions, and their indices */
1357  if (proute)
1358  ExecCleanupTupleRouting(mtstate, proute);
1359 
1360  /* Close the result relations, including any trigger target relations */
1361  ExecCloseResultRelations(estate);
1363 
1364  FreeExecutorState(estate);
1365 
1366  return processed;
1367 }
void pgstat_progress_update_param(int index, int64 val)
#define InvalidSubTransactionId
Definition: c.h:649
uint32 CommandId
Definition: c.h:657
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:538
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:241
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copyfrom.c:261
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
Definition: copyfrom.c:523
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copyfrom.c:617
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:577
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:286
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:596
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:298
void CopyFromErrorCallback(void *arg)
Definition: copyfrom.c:115
CopyInsertMethod
@ CIM_SINGLE
@ CIM_MULTI_CONDITIONAL
@ CIM_MULTI
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1180
ErrorContextCallback * error_context_stack
Definition: elog.c:94
#define NOTICE
Definition: elog.h:35
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:224
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:303
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, List *mergeActions)
Definition: execMain.c:1024
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1796
void ExecCloseResultRelations(EState *estate)
Definition: execMain.c:1521
void ExecCloseRangeTableRelations(EState *estate)
Definition: execMain.c:1581
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1920
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1278
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:730
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
Definition: execUtils.c:816
EState * CreateExecutorState(void)
Definition: execUtils.c:88
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1234
void FreeExecutorState(EState *estate)
Definition: execUtils.c:191
#define ResetPerTupleExprContext(estate)
Definition: executor.h:570
#define GetPerTupleExprContext(estate)
Definition: executor.h:561
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:566
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:424
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:1947
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1918
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:1935
@ COPY_LOG_VERBOSITY_DEFAULT
Definition: copy.h:49
void list_free(List *list)
Definition: list.c:1546
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:267
#define castNode(_type_, nodeptr)
Definition: nodes.h:176
#define NIL
Definition: pg_list.h:68
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1171
#define PROGRESS_COPY_TUPLES_PROCESSED
Definition: progress.h:142
#define PROGRESS_COPY_TUPLES_EXCLUDED
Definition: progress.h:143
#define PROGRESS_COPY_TUPLES_SKIPPED
Definition: progress.h:146
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1606
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:422
bool freeze
Definition: copy.h:65
CopyLogVerbosityChoice log_verbosity
Definition: copy.h:87
int64 reject_limit
Definition: copy.h:88
TransitionCaptureState * transition_capture
List * es_tupleTable
Definition: execnodes.h:677
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:258
EndForeignInsert_function EndForeignInsert
Definition: fdwapi.h:239
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:238
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:232
ExecForeignBatchInsert_function ExecForeignBatchInsert
Definition: fdwapi.h:233
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
Definition: fdwapi.h:234
CmdType operation
Definition: execnodes.h:1367
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1371
PlanState ps
Definition: execnodes.h:1366
ResultRelInfo * rootResultRelInfo
Definition: execnodes.h:1379
struct TransitionCaptureState * mt_transition_capture
Definition: execnodes.h:1405
Plan * plan
Definition: execnodes.h:1128
EState * state
Definition: execnodes.h:1130
SubTransactionId rd_firstRelfilelocatorSubid
Definition: rel.h:106
TriggerDesc * trigdesc
Definition: rel.h:117
TupleDesc rd_att
Definition: rel.h:112
SubTransactionId rd_newRelfilelocatorSubid
Definition: rel.h:104
SubTransactionId rd_createSubid
Definition: rel.h:103
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:591
int ri_NumIndices
Definition: execnodes.h:462
Relation ri_RelationDesc
Definition: execnodes.h:459
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:594
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:492
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:509
int ri_BatchSize
Definition: execnodes.h:520
TupleTableSlot * tcs_original_insert_tuple
Definition: trigger.h:76
bool trig_insert_instead_row
Definition: reltrigger.h:58
bool trig_insert_new_table
Definition: reltrigger.h:75
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:45
AttrMap * attrMap
Definition: tupconvert.h:28
TupleConstr * constr
Definition: tupdesc.h:85
Oid tts_tableOid
Definition: tuptable.h:130
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
#define TABLE_INSERT_FROZEN
Definition: tableam.h:261
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:260
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1402
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2395
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2459
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2552
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2535
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4888
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2446
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5044
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5024
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:472
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:790
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References AfterTriggerBeginQuery(), AfterTriggerEndQuery(), ErrorContextCallback::arg, Assert, TupleConversionMap::attrMap, FdwRoutine::BeginForeignInsert, ErrorContextCallback::callback, castNode, CHECK_FOR_INTERRUPTS, CheckValidResultRel(), CIM_MULTI, CIM_MULTI_CONDITIONAL, CIM_SINGLE, CMD_INSERT, TupleDescData::constr, contain_volatile_functions(), COPY_LOG_VERBOSITY_DEFAULT, COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromErrorCallback(), CopyMultiInsertInfoCleanup(), CopyMultiInsertInfoFlush(), CopyMultiInsertInfoInit(), CopyMultiInsertInfoIsEmpty(), CopyMultiInsertInfoIsFull(), CopyMultiInsertInfoNextFreeSlot(), CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfoStore(), CreateExecutorState(), CopyFromStateData::cur_lineno, CurrentMemoryContext, ExprContext::ecxt_scantuple, FdwRoutine::EndForeignInsert, ereport, errcode(), errhint(), errmsg(), errmsg_plural(), ERROR, error_context_stack, ErrorSaveContext::error_occurred, EState::es_tupleTable, CopyFromStateData::escontext, ExecARInsertTriggers(), ExecASInsertTriggers(), ExecBRInsertTriggers(), ExecBSInsertTriggers(), ExecCleanupTupleRouting(), ExecClearTuple(), ExecCloseRangeTableRelations(), ExecCloseResultRelations(), ExecComputeStoredGenerated(), ExecConstraints(), ExecCopySlot(), ExecFindPartition(), FdwRoutine::ExecForeignBatchInsert, FdwRoutine::ExecForeignInsert, ExecGetRootToChildMap(), ExecInitQual(), ExecInitRangeTable(), ExecInitResultRelation(), ExecInsertIndexTuples(), ExecIRInsertTriggers(), ExecMaterializeSlot(), ExecOpenIndices(), ExecPartitionCheck(), ExecQual(), ExecResetTupleTable(), ExecSetupPartitionTupleRouting(), ExecStoreVirtualTuple(), execute_attr_map_slot(), FreeBulkInsertState(), FreeExecutorState(), CopyFormatOptions::freeze, GetBulkInsertState(), GetCurrentCommandId(), GetCurrentSubTransactionId(), FdwRoutine::GetForeignModifyBatchSize, GetPerTupleExprContext, GetPerTupleMemoryContext, TupleConstr::has_generated_stored, InvalidateCatalogSnapshot(), InvalidSubTransactionId, StringInfoData::len, CopyFromStateData::line_buf, list_free(), list_length(), CopyFormatOptions::log_verbosity, makeNode, MakeTransitionCaptureState(), MemoryContextSwitchTo(), ModifyTableState::mt_nrels, ModifyTableState::mt_transition_capture, NextCopyFrom(), NIL, NOTICE, CopyFromStateData::num_errors, CopyFormatOptions::on_error, ModifyTableState::operation, CopyFromStateData::opts, pgstat_progress_update_param(), PlanState::plan, ErrorContextCallback::previous, PROGRESS_COPY_TUPLES_EXCLUDED, PROGRESS_COPY_TUPLES_PROCESSED, PROGRESS_COPY_TUPLES_SKIPPED, ModifyTableState::ps, CopyFromStateData::qualexpr, CopyFromStateData::range_table, RelationData::rd_att, RelationData::rd_createSubid, RelationData::rd_firstRelfilelocatorSubid, RelationData::rd_newRelfilelocatorSubid, RelationData::rd_rel, CopyFormatOptions::reject_limit, CopyFromStateData::rel, RelationGetRelationName, RelationGetRelid, ReleaseBulkInsertStatePin(), ResetPerTupleExprContext, ModifyTableState::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, ModifyTableState::rootResultRelInfo, CopyFromStateData::rteperminfos, PlanState::state, TABLE_INSERT_FROZEN, TABLE_INSERT_SKIP_FSM, table_slot_create(), table_tuple_insert(), TransitionCaptureState::tcs_original_insert_tuple, ThereAreNoPriorRegisteredSnapshots(), ThereAreNoReadyPortals(), CopyFromStateData::transition_capture, TriggerDesc::trig_insert_before_row, TriggerDesc::trig_insert_instead_row, TriggerDesc::trig_insert_new_table, RelationData::trigdesc, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tableOid, TupleTableSlot::tts_values, CopyFromStateData::volatile_defexprs, and CopyFromStateData::whereClause.

Referenced by copy_table(), and DoCopy().

◆ CopyFromErrorCallback()

void CopyFromErrorCallback ( void *  arg)

Definition at line 115 of file copyfrom.c.

116 {
117  CopyFromState cstate = (CopyFromState) arg;
118 
119  if (cstate->relname_only)
120  {
121  errcontext("COPY %s",
122  cstate->cur_relname);
123  return;
124  }
125  if (cstate->opts.binary)
126  {
127  /* can't usefully display the data */
128  if (cstate->cur_attname)
129  errcontext("COPY %s, line %llu, column %s",
130  cstate->cur_relname,
131  (unsigned long long) cstate->cur_lineno,
132  cstate->cur_attname);
133  else
134  errcontext("COPY %s, line %llu",
135  cstate->cur_relname,
136  (unsigned long long) cstate->cur_lineno);
137  }
138  else
139  {
140  if (cstate->cur_attname && cstate->cur_attval)
141  {
142  /* error is relevant to a particular column */
143  char *attval;
144 
145  attval = CopyLimitPrintoutLength(cstate->cur_attval);
146  errcontext("COPY %s, line %llu, column %s: \"%s\"",
147  cstate->cur_relname,
148  (unsigned long long) cstate->cur_lineno,
149  cstate->cur_attname,
150  attval);
151  pfree(attval);
152  }
153  else if (cstate->cur_attname)
154  {
155  /* error is relevant to a particular column, value is NULL */
156  errcontext("COPY %s, line %llu, column %s: null input",
157  cstate->cur_relname,
158  (unsigned long long) cstate->cur_lineno,
159  cstate->cur_attname);
160  }
161  else
162  {
163  /*
164  * Error is relevant to a particular line.
165  *
166  * If line_buf still contains the correct line, print it.
167  */
168  if (cstate->line_buf_valid)
169  {
170  char *lineval;
171 
172  lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
173  errcontext("COPY %s, line %llu: \"%s\"",
174  cstate->cur_relname,
175  (unsigned long long) cstate->cur_lineno, lineval);
176  pfree(lineval);
177  }
178  else
179  {
180  errcontext("COPY %s, line %llu",
181  cstate->cur_relname,
182  (unsigned long long) cstate->cur_lineno);
183  }
184  }
185  }
186 }
char * CopyLimitPrintoutLength(const char *str)
Definition: copyfrom.c:194
#define errcontext
Definition: elog.h:196
struct CopyFromStateData * CopyFromState
Definition: copy.h:93
void pfree(void *pointer)
Definition: mcxt.c:1521
void * arg

References arg, CopyFormatOptions::binary, CopyLimitPrintoutLength(), CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, StringInfoData::data, errcontext, CopyFromStateData::line_buf, CopyFromStateData::line_buf_valid, CopyFromStateData::opts, pfree(), and CopyFromStateData::relname_only.

Referenced by CopyFrom(), file_acquire_sample_rows(), and fileIterateForeignScan().

◆ CopyLimitPrintoutLength()

char* CopyLimitPrintoutLength ( const char *  str)

Definition at line 194 of file copyfrom.c.

195 {
196 #define MAX_COPY_DATA_DISPLAY 100
197 
198  int slen = strlen(str);
199  int len;
200  char *res;
201 
202  /* Fast path if definitely okay */
203  if (slen <= MAX_COPY_DATA_DISPLAY)
204  return pstrdup(str);
205 
206  /* Apply encoding-dependent truncation */
208 
209  /*
210  * Truncate, and add "..." to show we truncated the input.
211  */
212  res = (char *) palloc(len + 4);
213  memcpy(res, str, len);
214  strcpy(res + len, "...");
215 
216  return res;
217 }
#define MAX_COPY_DATA_DISPLAY
const char * str
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:1083
const void size_t len

References len, MAX_COPY_DATA_DISPLAY, palloc(), pg_mbcliplen(), pstrdup(), res, and str.

Referenced by CopyFromErrorCallback(), and NextCopyFrom().

◆ CopyMultiInsertBufferCleanup()

static void CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer 
)
inlinestatic

Definition at line 481 of file copyfrom.c.

483 {
484  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
485  int i;
486 
487  /* Ensure buffer was flushed */
488  Assert(buffer->nused == 0);
489 
490  /* Remove back-link to ourself */
491  resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
492 
493  if (resultRelInfo->ri_FdwRoutine == NULL)
494  {
495  Assert(buffer->bistate != NULL);
496  FreeBulkInsertState(buffer->bistate);
497  }
498  else
499  Assert(buffer->bistate == NULL);
500 
501  /* Since we only create slots on demand, just drop the non-null ones. */
502  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
504 
505  if (resultRelInfo->ri_FdwRoutine == NULL)
507  miinfo->ti_options);
508 
509  pfree(buffer);
510 }
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:63
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1341
int i
Definition: isn.c:73
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:80
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:81
BulkInsertState bistate
Definition: copyfrom.c:82
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1595

References Assert, CopyMultiInsertBuffer::bistate, ExecDropSingleTupleTableSlot(), FreeBulkInsertState(), i, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, pfree(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, table_finish_bulk_insert(), and CopyMultiInsertInfo::ti_options.

Referenced by CopyMultiInsertInfoCleanup(), and CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferFlush()

static void CopyMultiInsertBufferFlush ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer,
int64 *  processed 
)
inlinestatic

Definition at line 307 of file copyfrom.c.

310 {
311  CopyFromState cstate = miinfo->cstate;
312  EState *estate = miinfo->estate;
313  int nused = buffer->nused;
314  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
315  TupleTableSlot **slots = buffer->slots;
316  int i;
317 
318  if (resultRelInfo->ri_FdwRoutine)
319  {
320  int batch_size = resultRelInfo->ri_BatchSize;
321  int sent = 0;
322 
323  Assert(buffer->bistate == NULL);
324 
325  /* Ensure that the FDW supports batching and it's enabled */
327  Assert(batch_size > 1);
328 
329  /*
330  * We suppress error context information other than the relation name,
331  * if one of the operations below fails.
332  */
333  Assert(!cstate->relname_only);
334  cstate->relname_only = true;
335 
336  while (sent < nused)
337  {
338  int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
339  int inserted = size;
340  TupleTableSlot **rslots;
341 
342  /* insert into foreign table: let the FDW do it */
343  rslots =
344  resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
345  resultRelInfo,
346  &slots[sent],
347  NULL,
348  &inserted);
349 
350  sent += size;
351 
352  /* No need to do anything if there are no inserted rows */
353  if (inserted <= 0)
354  continue;
355 
356  /* Triggers on foreign tables should not have transition tables */
357  Assert(resultRelInfo->ri_TrigDesc == NULL ||
358  resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
359 
360  /* Run AFTER ROW INSERT triggers */
361  if (resultRelInfo->ri_TrigDesc != NULL &&
362  resultRelInfo->ri_TrigDesc->trig_insert_after_row)
363  {
364  Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
365 
366  for (i = 0; i < inserted; i++)
367  {
368  TupleTableSlot *slot = rslots[i];
369 
370  /*
371  * AFTER ROW Triggers might reference the tableoid column,
372  * so (re-)initialize tts_tableOid before evaluating them.
373  */
374  slot->tts_tableOid = relid;
375 
376  ExecARInsertTriggers(estate, resultRelInfo,
377  slot, NIL,
378  cstate->transition_capture);
379  }
380  }
381 
382  /* Update the row counter and progress of the COPY command */
383  *processed += inserted;
385  *processed);
386  }
387 
388  for (i = 0; i < nused; i++)
389  ExecClearTuple(slots[i]);
390 
391  /* reset relname_only */
392  cstate->relname_only = false;
393  }
394  else
395  {
396  CommandId mycid = miinfo->mycid;
397  int ti_options = miinfo->ti_options;
398  bool line_buf_valid = cstate->line_buf_valid;
399  uint64 save_cur_lineno = cstate->cur_lineno;
400  MemoryContext oldcontext;
401 
402  Assert(buffer->bistate != NULL);
403 
404  /*
405  * Print error context information correctly, if one of the operations
406  * below fails.
407  */
408  cstate->line_buf_valid = false;
409 
410  /*
411  * table_multi_insert may leak memory, so switch to short-lived memory
412  * context before calling it.
413  */
414  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
415  table_multi_insert(resultRelInfo->ri_RelationDesc,
416  slots,
417  nused,
418  mycid,
419  ti_options,
420  buffer->bistate);
421  MemoryContextSwitchTo(oldcontext);
422 
423  for (i = 0; i < nused; i++)
424  {
425  /*
426  * If there are any indexes, update them for all the inserted
427  * tuples, and run AFTER ROW INSERT triggers.
428  */
429  if (resultRelInfo->ri_NumIndices > 0)
430  {
431  List *recheckIndexes;
432 
433  cstate->cur_lineno = buffer->linenos[i];
434  recheckIndexes =
435  ExecInsertIndexTuples(resultRelInfo,
436  buffer->slots[i], estate, false,
437  false, NULL, NIL, false);
438  ExecARInsertTriggers(estate, resultRelInfo,
439  slots[i], recheckIndexes,
440  cstate->transition_capture);
441  list_free(recheckIndexes);
442  }
443 
444  /*
445  * There's no indexes, but see if we need to run AFTER ROW INSERT
446  * triggers anyway.
447  */
448  else if (resultRelInfo->ri_TrigDesc != NULL &&
449  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
450  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
451  {
452  cstate->cur_lineno = buffer->linenos[i];
453  ExecARInsertTriggers(estate, resultRelInfo,
454  slots[i], NIL,
455  cstate->transition_capture);
456  }
457 
458  ExecClearTuple(slots[i]);
459  }
460 
461  /* Update the row counter and progress of the COPY command */
462  *processed += nused;
464  *processed);
465 
466  /* reset cur_lineno and line_buf_valid to what they were */
467  cstate->line_buf_valid = line_buf_valid;
468  cstate->cur_lineno = save_cur_lineno;
469  }
470 
471  /* Mark that all slots are free */
472  buffer->nused = 0;
473 }
static pg_noinline void Size size
Definition: slab.c:607
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:85
CommandId mycid
Definition: copyfrom.c:101
CopyFromState cstate
Definition: copyfrom.c:99
bool trig_insert_after_row
Definition: reltrigger.h:57
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1457

References Assert, CopyMultiInsertBuffer::bistate, CopyMultiInsertInfo::cstate, CopyFromStateData::cur_lineno, CopyMultiInsertInfo::estate, ExecARInsertTriggers(), ExecClearTuple(), FdwRoutine::ExecForeignBatchInsert, ExecInsertIndexTuples(), GetPerTupleMemoryContext, i, CopyFromStateData::line_buf_valid, CopyMultiInsertBuffer::linenos, list_free(), MemoryContextSwitchTo(), CopyMultiInsertInfo::mycid, NIL, CopyMultiInsertBuffer::nused, pgstat_progress_update_param(), PROGRESS_COPY_TUPLES_PROCESSED, RelationGetRelid, CopyFromStateData::relname_only, CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, size, CopyMultiInsertBuffer::slots, table_multi_insert(), CopyMultiInsertInfo::ti_options, CopyFromStateData::transition_capture, TriggerDesc::trig_insert_after_row, TriggerDesc::trig_insert_new_table, and TupleTableSlot::tts_tableOid.

Referenced by CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferInit()

static CopyMultiInsertBuffer* CopyMultiInsertBufferInit ( ResultRelInfo rri)
static

Definition at line 224 of file copyfrom.c.

225 {
226  CopyMultiInsertBuffer *buffer;
227 
228  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
229  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
230  buffer->resultRelInfo = rri;
231  buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
232  buffer->nused = 0;
233 
234  return buffer;
235 }

References CopyMultiInsertBuffer::bistate, GetBulkInsertState(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, palloc(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_FdwRoutine, and CopyMultiInsertBuffer::slots.

Referenced by CopyMultiInsertInfoSetupBuffer().

◆ CopyMultiInsertInfoCleanup()

static void CopyMultiInsertInfoCleanup ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 577 of file copyfrom.c.

578 {
579  ListCell *lc;
580 
581  foreach(lc, miinfo->multiInsertBuffers)
583 
584  list_free(miinfo->multiInsertBuffers);
585 }
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:481
#define lfirst(lc)
Definition: pg_list.h:172
List * multiInsertBuffers
Definition: copyfrom.c:96

References CopyMultiInsertBufferCleanup(), lfirst, list_free(), and CopyMultiInsertInfo::multiInsertBuffers.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoFlush()

static void CopyMultiInsertInfoFlush ( CopyMultiInsertInfo miinfo,
ResultRelInfo curr_rri,
int64 *  processed 
)
inlinestatic

Definition at line 523 of file copyfrom.c.

525 {
526  ListCell *lc;
527 
528  foreach(lc, miinfo->multiInsertBuffers)
529  {
531 
532  CopyMultiInsertBufferFlush(miinfo, buffer, processed);
533  }
534 
535  miinfo->bufferedTuples = 0;
536  miinfo->bufferedBytes = 0;
537 
538  /*
539  * Trim the list of tracked buffers down if it exceeds the limit. Here we
540  * remove buffers starting with the ones we created first. It seems less
541  * likely that these older ones will be needed than the ones that were
542  * just created.
543  */
545  {
546  CopyMultiInsertBuffer *buffer;
547 
548  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
549 
550  /*
551  * We never want to remove the buffer that's currently being used, so
552  * if we happen to find that then move it to the end of the list.
553  */
554  if (buffer->resultRelInfo == curr_rri)
555  {
556  /*
557  * The code below would misbehave if we were trying to reduce the
558  * list to less than two items.
559  */
561  "MAX_PARTITION_BUFFERS must be >= 2");
562 
564  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
565  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
566  }
567 
568  CopyMultiInsertBufferCleanup(miinfo, buffer);
570  }
571 }
#define StaticAssertDecl(condition, errmessage)
Definition: c.h:927
#define MAX_PARTITION_BUFFERS
Definition: copyfrom.c:75
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
Definition: copyfrom.c:307
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_delete_first(List *list)
Definition: list.c:943
#define linitial(l)
Definition: pg_list.h:178

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBufferCleanup(), CopyMultiInsertBufferFlush(), lappend(), lfirst, linitial, list_delete_first(), list_length(), MAX_PARTITION_BUFFERS, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertBuffer::resultRelInfo, and StaticAssertDecl.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoInit()

static void CopyMultiInsertInfoInit ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
CopyFromState  cstate,
EState estate,
CommandId  mycid,
int  ti_options 
)
static

Definition at line 261 of file copyfrom.c.

264 {
265  miinfo->multiInsertBuffers = NIL;
266  miinfo->bufferedTuples = 0;
267  miinfo->bufferedBytes = 0;
268  miinfo->cstate = cstate;
269  miinfo->estate = estate;
270  miinfo->mycid = mycid;
271  miinfo->ti_options = ti_options;
272 
273  /*
274  * Only setup the buffer when not dealing with a partitioned table.
275  * Buffers for partitioned tables will just be setup when we need to send
276  * tuples their way for the first time.
277  */
278  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
279  CopyMultiInsertInfoSetupBuffer(miinfo, rri);
280 }

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfo::cstate, CopyMultiInsertInfo::estate, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertInfo::mycid, NIL, RelationData::rd_rel, ResultRelInfo::ri_RelationDesc, and CopyMultiInsertInfo::ti_options.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsEmpty()

static bool CopyMultiInsertInfoIsEmpty ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 298 of file copyfrom.c.

299 {
300  return miinfo->bufferedTuples == 0;
301 }

References CopyMultiInsertInfo::bufferedTuples.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsFull()

static bool CopyMultiInsertInfoIsFull ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 286 of file copyfrom.c.

287 {
288  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
290  return true;
291  return false;
292 }
#define MAX_BUFFERED_BYTES
Definition: copyfrom.c:69

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, MAX_BUFFERED_BYTES, and MAX_BUFFERED_TUPLES.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoNextFreeSlot()

static TupleTableSlot* CopyMultiInsertInfoNextFreeSlot ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 596 of file copyfrom.c.

598 {
600  int nused;
601 
602  Assert(buffer != NULL);
603  Assert(buffer->nused < MAX_BUFFERED_TUPLES);
604 
605  nused = buffer->nused;
606 
607  if (buffer->slots[nused] == NULL)
608  buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
609  return buffer->slots[nused];
610 }

References Assert, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, and table_slot_create().

Referenced by CopyFrom().

◆ CopyMultiInsertInfoSetupBuffer()

static void CopyMultiInsertInfoSetupBuffer ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 241 of file copyfrom.c.

243 {
244  CopyMultiInsertBuffer *buffer;
245 
246  buffer = CopyMultiInsertBufferInit(rri);
247 
248  /* Setup back-link so we can easily find this buffer again */
249  rri->ri_CopyMultiInsertBuffer = buffer;
250  /* Record that we're tracking this buffer */
251  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
252 }
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
Definition: copyfrom.c:224

References CopyMultiInsertBufferInit(), lappend(), CopyMultiInsertInfo::multiInsertBuffers, and ResultRelInfo::ri_CopyMultiInsertBuffer.

Referenced by CopyFrom(), and CopyMultiInsertInfoInit().

◆ CopyMultiInsertInfoStore()

static void CopyMultiInsertInfoStore ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
TupleTableSlot slot,
int  tuplen,
uint64  lineno 
)
inlinestatic

Definition at line 617 of file copyfrom.c.

619 {
621 
622  Assert(buffer != NULL);
623  Assert(slot == buffer->slots[buffer->nused]);
624 
625  /* Store the line number so we can properly report any errors later */
626  buffer->linenos[buffer->nused] = lineno;
627 
628  /* Record this slot as being used */
629  buffer->nused++;
630 
631  /* Update how many tuples are stored and their size */
632  miinfo->bufferedTuples++;
633  miinfo->bufferedBytes += tuplen;
634 }

References Assert, CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBuffer::linenos, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, and CopyMultiInsertBuffer::slots.

Referenced by CopyFrom().

◆ EndCopyFrom()

void EndCopyFrom ( CopyFromState  cstate)

Definition at line 1802 of file copyfrom.c.

1803 {
1804  /* No COPY FROM related resources except memory. */
1805  if (cstate->is_program)
1806  {
1807  ClosePipeFromProgram(cstate);
1808  }
1809  else
1810  {
1811  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1812  ereport(ERROR,
1814  errmsg("could not close file \"%s\": %m",
1815  cstate->filename)));
1816  }
1817 
1819 
1821  pfree(cstate);
1822 }
void pgstat_progress_end_command(void)
static void ClosePipeFromProgram(CopyFromState cstate)
Definition: copyfrom.c:1828
int FreeFile(FILE *file)
Definition: fd.c:2804
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454

References ClosePipeFromProgram(), CopyFromStateData::copy_file, CopyFromStateData::copycontext, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyFromStateData::filename, FreeFile(), CopyFromStateData::is_program, MemoryContextDelete(), pfree(), and pgstat_progress_end_command().

Referenced by DoCopy(), file_acquire_sample_rows(), fileEndForeignScan(), and fileReScanForeignScan().