PostgreSQL Source Code  git master
copyfrom.c File Reference
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "executor/tuptable.h"
#include "foreign/fdwapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
Include dependency graph for copyfrom.c:

Go to the source code of this file.

Data Structures

struct  CopyMultiInsertBuffer
 
struct  CopyMultiInsertInfo
 

Macros

#define MAX_BUFFERED_TUPLES   1000
 
#define MAX_BUFFERED_BYTES   65535
 
#define MAX_PARTITION_BUFFERS   32
 
#define MAX_COPY_DATA_DISPLAY   100
 

Typedefs

typedef struct CopyMultiInsertBuffer CopyMultiInsertBuffer
 
typedef struct CopyMultiInsertInfo CopyMultiInsertInfo
 

Functions

static char * limit_printout_length (const char *str)
 
static void ClosePipeFromProgram (CopyFromState cstate)
 
void CopyFromErrorCallback (void *arg)
 
static CopyMultiInsertBufferCopyMultiInsertBufferInit (ResultRelInfo *rri)
 
static void CopyMultiInsertInfoSetupBuffer (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoInit (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
 
static bool CopyMultiInsertInfoIsFull (CopyMultiInsertInfo *miinfo)
 
static bool CopyMultiInsertInfoIsEmpty (CopyMultiInsertInfo *miinfo)
 
static void CopyMultiInsertBufferFlush (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
 
static void CopyMultiInsertBufferCleanup (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
 
static void CopyMultiInsertInfoFlush (CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 
static void CopyMultiInsertInfoCleanup (CopyMultiInsertInfo *miinfo)
 
static TupleTableSlotCopyMultiInsertInfoNextFreeSlot (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoStore (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
 
uint64 CopyFrom (CopyFromState cstate)
 
CopyFromState BeginCopyFrom (ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
 
void EndCopyFrom (CopyFromState cstate)
 

Macro Definition Documentation

◆ MAX_BUFFERED_BYTES

#define MAX_BUFFERED_BYTES   65535

Definition at line 64 of file copyfrom.c.

Referenced by CopyMultiInsertInfoIsFull().

◆ MAX_BUFFERED_TUPLES

#define MAX_BUFFERED_TUPLES   1000

◆ MAX_COPY_DATA_DISPLAY

#define MAX_COPY_DATA_DISPLAY   100

Referenced by limit_printout_length().

◆ MAX_PARTITION_BUFFERS

#define MAX_PARTITION_BUFFERS   32

Definition at line 67 of file copyfrom.c.

Referenced by CopyMultiInsertInfoFlush().

Typedef Documentation

◆ CopyMultiInsertBuffer

◆ CopyMultiInsertInfo

Function Documentation

◆ BeginCopyFrom()

CopyFromState BeginCopyFrom ( ParseState pstate,
Relation  rel,
Node whereClause,
const char *  filename,
bool  is_program,
copy_data_source_cb  data_source_cb,
List attnamelist,
List options 
)

Definition at line 1180 of file copyfrom.c.

References AllocateFile(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, attnum, CopyFromStateData::attnumlist, CopyFromStateData::attribute_buf, CopyFormatOptions::binary, build_column_default(), CopyFromStateData::bytes_processed, contain_volatile_functions_not_nextval(), CopyFormatOptions::convert_select, CopyFromStateData::convert_select_flags, CopyFormatOptions::convert_selectively, COPY_CALLBACK, COPY_FILE, CopyFromStateData::copy_file, CopyFromStateData::copy_src, CopyFromStateData::copycontext, CopyGetAttnums(), cur, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, CurrentMemoryContext, CopyFromStateData::data_source_cb, CopyFromStateData::defexprs, CopyFromStateData::defmap, DestRemote, CopyFromStateData::encoding_embeds_ascii, CopyFromStateData::eol_type, EOL_UNKNOWN, ereport, errcode(), errcode_for_file_access(), errhint(), errmsg(), errmsg_internal(), ERROR, ExecInitExpr(), expression_planner(), CopyFormatOptions::file_encoding, CopyFromStateData::file_encoding, CopyFromStateData::filename, fmgr_info(), CopyFormatOptions::force_notnull, CopyFormatOptions::force_notnull_flags, CopyFormatOptions::force_null, CopyFormatOptions::force_null_flags, fstat, GetDatabaseEncoding(), getTypeBinaryInputInfo(), getTypeInputInfo(), CopyFromStateData::in_functions, initStringInfo(), InvalidOid, CopyFromStateData::is_program, lfirst_int, CopyFromStateData::line_buf, CopyFromStateData::line_buf_converted, list_length(), list_member_int(), CopyFromStateData::max_fields, MemoryContextSwitchTo(), NameStr, TupleDescData::natts, CopyFromStateData::need_transcoding, CopyFromStateData::num_defaults, OpenPipeStream(), CopyFromStateData::opts, ParseState::p_rtable, palloc(), palloc0(), PG_BINARY_R, pg_database_encoding_max_length(), PG_ENCODING_IS_CLIENT_ONLY, pg_get_client_encoding(), pgstat_progress_start_command(), pgstat_progress_update_param(), ProcessCopyOptions(), PROGRESS_COMMAND_COPY, PROGRESS_COPY_BYTES_TOTAL, pstrdup(), CopyFromStateData::range_table, CopyFromStateData::raw_buf, CopyFromStateData::raw_buf_index, CopyFromStateData::raw_buf_len, RAW_BUF_SIZE, CopyFromStateData::raw_fields, CopyFromStateData::reached_eof, ReceiveCopyBegin(), ReceiveCopyBinaryHeader(), CopyFromStateData::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, S_ISDIR, stat::st_mode, stat::st_size, TupleDescAttr, CopyFromStateData::typioparams, CopyFromStateData::volatile_defexprs, CopyFromStateData::whereClause, and whereToSendOutput.

Referenced by copy_table(), DoCopy(), file_acquire_sample_rows(), fileBeginForeignScan(), and fileReScanForeignScan().

1188 {
1189  CopyFromState cstate;
1190  bool pipe = (filename == NULL);
1191  TupleDesc tupDesc;
1192  AttrNumber num_phys_attrs,
1193  num_defaults;
1194  FmgrInfo *in_functions;
1195  Oid *typioparams;
1196  int attnum;
1197  Oid in_func_oid;
1198  int *defmap;
1199  ExprState **defexprs;
1200  MemoryContext oldcontext;
1201  bool volatile_defexprs;
1202 
1203  /* Allocate workspace and zero all fields */
1204  cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1205 
1206  /*
1207  * We allocate everything used by a cstate in a new memory context. This
1208  * avoids memory leaks during repeated use of COPY in a query.
1209  */
1211  "COPY",
1213 
1214  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1215 
1216  /* Extract options from the statement node tree */
1217  ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */, options);
1218 
1219  /* Process the target relation */
1220  cstate->rel = rel;
1221 
1222  tupDesc = RelationGetDescr(cstate->rel);
1223 
1224  /* process commmon options or initialization */
1225 
1226  /* Generate or convert list of attributes to process */
1227  cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1228 
1229  num_phys_attrs = tupDesc->natts;
1230 
1231  /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1232  cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1233  if (cstate->opts.force_notnull)
1234  {
1235  List *attnums;
1236  ListCell *cur;
1237 
1238  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1239 
1240  foreach(cur, attnums)
1241  {
1242  int attnum = lfirst_int(cur);
1243  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1244 
1245  if (!list_member_int(cstate->attnumlist, attnum))
1246  ereport(ERROR,
1247  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1248  errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1249  NameStr(attr->attname))));
1250  cstate->opts.force_notnull_flags[attnum - 1] = true;
1251  }
1252  }
1253 
1254  /* Convert FORCE_NULL name list to per-column flags, check validity */
1255  cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1256  if (cstate->opts.force_null)
1257  {
1258  List *attnums;
1259  ListCell *cur;
1260 
1261  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1262 
1263  foreach(cur, attnums)
1264  {
1265  int attnum = lfirst_int(cur);
1266  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1267 
1268  if (!list_member_int(cstate->attnumlist, attnum))
1269  ereport(ERROR,
1270  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1271  errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1272  NameStr(attr->attname))));
1273  cstate->opts.force_null_flags[attnum - 1] = true;
1274  }
1275  }
1276 
1277  /* Convert convert_selectively name list to per-column flags */
1278  if (cstate->opts.convert_selectively)
1279  {
1280  List *attnums;
1281  ListCell *cur;
1282 
1283  cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1284 
1285  attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1286 
1287  foreach(cur, attnums)
1288  {
1289  int attnum = lfirst_int(cur);
1290  Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1291 
1292  if (!list_member_int(cstate->attnumlist, attnum))
1293  ereport(ERROR,
1294  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1295  errmsg_internal("selected column \"%s\" not referenced by COPY",
1296  NameStr(attr->attname))));
1297  cstate->convert_select_flags[attnum - 1] = true;
1298  }
1299  }
1300 
1301  /* Use client encoding when ENCODING option is not specified. */
1302  if (cstate->opts.file_encoding < 0)
1304  else
1305  cstate->file_encoding = cstate->opts.file_encoding;
1306 
1307  /*
1308  * Set up encoding conversion info. Even if the file and server encodings
1309  * are the same, we must apply pg_any_to_server() to validate data in
1310  * multibyte encodings.
1311  */
1312  cstate->need_transcoding =
1313  (cstate->file_encoding != GetDatabaseEncoding() ||
1315  /* See Multibyte encoding comment above */
1317 
1318  cstate->copy_src = COPY_FILE; /* default */
1319 
1320  cstate->whereClause = whereClause;
1321 
1322  MemoryContextSwitchTo(oldcontext);
1323 
1324  oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1325 
1326  /* Initialize state variables */
1327  cstate->reached_eof = false;
1328  cstate->eol_type = EOL_UNKNOWN;
1329  cstate->cur_relname = RelationGetRelationName(cstate->rel);
1330  cstate->cur_lineno = 0;
1331  cstate->cur_attname = NULL;
1332  cstate->cur_attval = NULL;
1333 
1334  /*
1335  * Set up variables to avoid per-attribute overhead. attribute_buf and
1336  * raw_buf are used in both text and binary modes, but we use line_buf
1337  * only in text mode.
1338  */
1339  initStringInfo(&cstate->attribute_buf);
1340  cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
1341  cstate->raw_buf_index = cstate->raw_buf_len = 0;
1342  if (!cstate->opts.binary)
1343  {
1344  initStringInfo(&cstate->line_buf);
1345  cstate->line_buf_converted = false;
1346  }
1347 
1348  /* Assign range table, we'll need it in CopyFrom. */
1349  if (pstate)
1350  cstate->range_table = pstate->p_rtable;
1351 
1352  tupDesc = RelationGetDescr(cstate->rel);
1353  num_phys_attrs = tupDesc->natts;
1354  num_defaults = 0;
1355  volatile_defexprs = false;
1356 
1357  /*
1358  * Pick up the required catalog information for each attribute in the
1359  * relation, including the input function, the element type (to pass to
1360  * the input function), and info about defaults and constraints. (Which
1361  * input function we use depends on text/binary format choice.)
1362  */
1363  in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1364  typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1365  defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1366  defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1367 
1368  for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1369  {
1370  Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1371 
1372  /* We don't need info for dropped attributes */
1373  if (att->attisdropped)
1374  continue;
1375 
1376  /* Fetch the input function and typioparam info */
1377  if (cstate->opts.binary)
1378  getTypeBinaryInputInfo(att->atttypid,
1379  &in_func_oid, &typioparams[attnum - 1]);
1380  else
1381  getTypeInputInfo(att->atttypid,
1382  &in_func_oid, &typioparams[attnum - 1]);
1383  fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1384 
1385  /* Get default info if needed */
1386  if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated)
1387  {
1388  /* attribute is NOT to be copied from input */
1389  /* use default value if one exists */
1390  Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1391  attnum);
1392 
1393  if (defexpr != NULL)
1394  {
1395  /* Run the expression through planner */
1396  defexpr = expression_planner(defexpr);
1397 
1398  /* Initialize executable expression in copycontext */
1399  defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1400  defmap[num_defaults] = attnum - 1;
1401  num_defaults++;
1402 
1403  /*
1404  * If a default expression looks at the table being loaded,
1405  * then it could give the wrong answer when using
1406  * multi-insert. Since database access can be dynamic this is
1407  * hard to test for exactly, so we use the much wider test of
1408  * whether the default expression is volatile. We allow for
1409  * the special case of when the default expression is the
1410  * nextval() of a sequence which in this specific case is
1411  * known to be safe for use with the multi-insert
1412  * optimization. Hence we use this special case function
1413  * checker rather than the standard check for
1414  * contain_volatile_functions().
1415  */
1416  if (!volatile_defexprs)
1417  volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1418  }
1419  }
1420  }
1421 
1422 
1423  /* initialize progress */
1425  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1426  cstate->bytes_processed = 0;
1427 
1428  /* We keep those variables in cstate. */
1429  cstate->in_functions = in_functions;
1430  cstate->typioparams = typioparams;
1431  cstate->defmap = defmap;
1432  cstate->defexprs = defexprs;
1433  cstate->volatile_defexprs = volatile_defexprs;
1434  cstate->num_defaults = num_defaults;
1435  cstate->is_program = is_program;
1436 
1437  if (data_source_cb)
1438  {
1439  cstate->copy_src = COPY_CALLBACK;
1440  cstate->data_source_cb = data_source_cb;
1441  }
1442  else if (pipe)
1443  {
1444  Assert(!is_program); /* the grammar does not allow this */
1446  ReceiveCopyBegin(cstate);
1447  else
1448  cstate->copy_file = stdin;
1449  }
1450  else
1451  {
1452  cstate->filename = pstrdup(filename);
1453 
1454  if (cstate->is_program)
1455  {
1456  cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1457  if (cstate->copy_file == NULL)
1458  ereport(ERROR,
1460  errmsg("could not execute command \"%s\": %m",
1461  cstate->filename)));
1462  }
1463  else
1464  {
1465  struct stat st;
1466 
1467  cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1468  if (cstate->copy_file == NULL)
1469  {
1470  /* copy errno because ereport subfunctions might change it */
1471  int save_errno = errno;
1472 
1473  ereport(ERROR,
1475  errmsg("could not open file \"%s\" for reading: %m",
1476  cstate->filename),
1477  (save_errno == ENOENT || save_errno == EACCES) ?
1478  errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1479  "You may want a client-side facility such as psql's \\copy.") : 0));
1480  }
1481 
1482  if (fstat(fileno(cstate->copy_file), &st))
1483  ereport(ERROR,
1485  errmsg("could not stat file \"%s\": %m",
1486  cstate->filename)));
1487 
1488  if (S_ISDIR(st.st_mode))
1489  ereport(ERROR,
1490  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1491  errmsg("\"%s\" is a directory", cstate->filename)));
1492 
1494  }
1495  }
1496 
1497  if (cstate->opts.binary)
1498  {
1499  /* Read and verify binary header */
1500  ReceiveCopyBinaryHeader(cstate);
1501  }
1502 
1503  /* create workspace for CopyReadAttributes results */
1504  if (!cstate->opts.binary)
1505  {
1506  AttrNumber attr_count = list_length(cstate->attnumlist);
1507 
1508  cstate->max_fields = attr_count;
1509  cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1510  }
1511 
1512  MemoryContextSwitchTo(oldcontext);
1513 
1514  return cstate;
1515 }
copy_data_source_cb data_source_cb
void ReceiveCopyBegin(CopyFromState cstate)
Definition: fmgr.h:56
#define AllocSetContextCreate
Definition: memutils.h:170
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:487
const char * cur_attname
int errhint(const char *fmt,...)
Definition: elog.c:1162
StringInfoData attribute_buf
#define RelationGetDescr(relation)
Definition: rel.h:483
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
Definition: pgstat.c:3457
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void pgstat_progress_update_param(int index, int64 val)
Definition: pgstat.c:3478
char * pstrdup(const char *in)
Definition: mcxt.c:1187
Expr * expression_planner(Expr *expr)
Definition: planner.c:6166
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
StringInfoData line_buf
Definition: nodes.h:528
struct cursor * cur
Definition: ecpg.c:28
int errcode(int sqlerrcode)
Definition: elog.c:704
List * force_notnull
Definition: copy.h:45
const char * cur_attval
unsigned int Oid
Definition: postgres_ext.h:31
#define PG_BINARY_R
Definition: c.h:1261
MemoryContext copycontext
#define RAW_BUF_SIZE
#define fstat
Definition: win32_port.h:274
#define ERROR
Definition: elog.h:45
#define lfirst_int(lc)
Definition: pg_list.h:170
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:126
bool binary
Definition: copy.h:32
List * force_null
Definition: copy.h:47
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
bool list_member_int(const List *list, int datum)
Definition: list.c:669
#define PROGRESS_COPY_BYTES_TOTAL
Definition: progress.h:138
int errcode_for_file_access(void)
Definition: elog.c:727
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2354
#define RelationGetRelationName(relation)
Definition: rel.h:491
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
int file_encoding
Definition: copy.h:30
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2457
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:2860
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2794
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Node * build_column_default(Relation rel, int attrno)
void * palloc0(Size size)
Definition: mcxt.c:981
int GetDatabaseEncoding(void)
Definition: mbutils.c:1151
int pg_get_client_encoding(void)
Definition: mbutils.c:336
const char * cur_relname
List * convert_select
Definition: copy.h:50
#define InvalidOid
Definition: postgres_ext.h:36
int16 attnum
Definition: pg_attribute.h:79
#define ereport(elevel,...)
Definition: elog.h:155
bool convert_selectively
Definition: copy.h:49
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
#define Assert(condition)
Definition: c.h:792
bool * force_null_flags
Definition: copy.h:48
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:686
static int list_length(const List *l)
Definition: pg_list.h:149
void ReceiveCopyBinaryHeader(CopyFromState cstate)
int pg_database_encoding_max_length(void)
Definition: mbutils.c:1436
#define S_ISDIR(m)
Definition: win32_port.h:316
CopyFormatOptions opts
#define PG_ENCODING_IS_CLIENT_ONLY(_enc)
Definition: pg_wchar.h:298
static char * filename
Definition: pg_dumpall.c:91
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define NameStr(name)
Definition: c.h:669
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:123
CommandDest whereToSendOutput
Definition: postgres.c:92
Definition: pg_list.h:50
int16 AttrNumber
Definition: attnum.h:21
#define RelationGetRelid(relation)
Definition: rel.h:457
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:334
List * p_rtable
Definition: parse_node.h:180
bool * force_notnull_flags
Definition: copy.h:46

◆ ClosePipeFromProgram()

static void ClosePipeFromProgram ( CopyFromState  cstate)
static

Definition at line 1547 of file copyfrom.c.

References Assert, ClosePipeStream(), CopyFromStateData::copy_file, ereport, errcode(), errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, CopyFromStateData::filename, CopyFromStateData::is_program, CopyFromStateData::reached_eof, SIGPIPE, wait_result_is_signal(), and wait_result_to_str().

Referenced by EndCopyFrom().

1548 {
1549  int pclose_rc;
1550 
1551  Assert(cstate->is_program);
1552 
1553  pclose_rc = ClosePipeStream(cstate->copy_file);
1554  if (pclose_rc == -1)
1555  ereport(ERROR,
1557  errmsg("could not close pipe to external command: %m")));
1558  else if (pclose_rc != 0)
1559  {
1560  /*
1561  * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1562  * expectable for the called program to fail with SIGPIPE, and we
1563  * should not report that as an error. Otherwise, SIGPIPE indicates a
1564  * problem.
1565  */
1566  if (!cstate->reached_eof &&
1567  wait_result_is_signal(pclose_rc, SIGPIPE))
1568  return;
1569 
1570  ereport(ERROR,
1571  (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1572  errmsg("program \"%s\" failed",
1573  cstate->filename),
1574  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1575  }
1576 }
int errcode(int sqlerrcode)
Definition: elog.c:704
#define SIGPIPE
Definition: win32_port.h:164
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:32
int ClosePipeStream(FILE *file)
Definition: fd.c:2763
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1075
bool wait_result_is_signal(int exit_status, int signum)
Definition: wait_error.c:92
#define ERROR
Definition: elog.h:45
int errcode_for_file_access(void)
Definition: elog.c:727
#define ereport(elevel,...)
Definition: elog.h:155
#define Assert(condition)
Definition: c.h:792
int errmsg(const char *fmt,...)
Definition: elog.c:915

◆ CopyFrom()

uint64 CopyFrom ( CopyFromState  cstate)

Definition at line 524 of file copyfrom.c.

References AfterTriggerBeginQuery(), AfterTriggerEndQuery(), ErrorContextCallback::arg, Assert, TupleConversionMap::attrMap, FdwRoutine::BeginForeignInsert, CopyMultiInsertBuffer::bistate, ErrorContextCallback::callback, castNode, CHECK_FOR_INTERRUPTS, CheckValidResultRel(), CIM_MULTI, CIM_MULTI_CONDITIONAL, CIM_SINGLE, CMD_INSERT, TupleDescData::constr, contain_volatile_functions(), COPY_OLD_FE, CopyFromStateData::copy_src, CopyFromErrorCallback(), CopyMultiInsertInfoCleanup(), CopyMultiInsertInfoFlush(), CopyMultiInsertInfoInit(), CopyMultiInsertInfoIsEmpty(), CopyMultiInsertInfoIsFull(), CopyMultiInsertInfoNextFreeSlot(), CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfoStore(), CreateExecutorState(), CopyFromStateData::cur_lineno, CurrentMemoryContext, ExprContext::ecxt_scantuple, FdwRoutine::EndForeignInsert, ereport, errcode(), errhint(), errmsg(), ERROR, error_context_stack, EState::es_tupleTable, ExecARInsertTriggers(), ExecASInsertTriggers(), ExecBRInsertTriggers(), ExecBSInsertTriggers(), ExecCleanupTupleRouting(), ExecClearTuple(), ExecCloseRangeTableRelations(), ExecCloseResultRelations(), ExecComputeStoredGenerated(), ExecConstraints(), ExecCopySlot(), ExecFindPartition(), FdwRoutine::ExecForeignInsert, ExecInitQual(), ExecInitRangeTable(), ExecInitResultRelation(), ExecInsertIndexTuples(), ExecIRInsertTriggers(), ExecMaterializeSlot(), ExecOpenIndices(), ExecPartitionCheck(), ExecQual(), ExecResetTupleTable(), ExecSetupPartitionTupleRouting(), ExecStoreVirtualTuple(), execute_attr_map_slot(), FreeBulkInsertState(), FreeExecutorState(), CopyFormatOptions::freeze, GetBulkInsertState(), GetCurrentCommandId(), GetCurrentSubTransactionId(), GetPerTupleExprContext, GetPerTupleMemoryContext, TupleConstr::has_generated_stored, InvalidateCatalogSnapshot(), InvalidSubTransactionId, StringInfoData::len, CopyFromStateData::line_buf, list_free(), list_length(), makeNode, MakeTransitionCaptureState(), MemoryContextSwitchTo(), ModifyTableState::mt_transition_capture, NextCopyFrom(), NIL, ModifyTableState::operation, CopyFromStateData::opts, pgstat_progress_update_param(), PlanState::plan, pq_endmsgread(), ErrorContextCallback::previous, PROGRESS_COPY_LINES_PROCESSED, ModifyTableState::ps, CopyFromStateData::qualexpr, CopyFromStateData::range_table, RelationData::rd_att, RelationData::rd_createSubid, RelationData::rd_firstRelfilenodeSubid, RelationData::rd_newRelfilenodeSubid, RelationData::rd_rel, CopyFromStateData::rel, RelationGetRelationName, RelationGetRelid, ReleaseBulkInsertStatePin(), ResetPerTupleExprContext, CopyMultiInsertBuffer::resultRelInfo, ModifyTableState::resultRelInfo, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_RootToPartitionMap, ResultRelInfo::ri_TrigDesc, PlanState::state, TABLE_INSERT_FROZEN, TABLE_INSERT_SKIP_FSM, table_slot_create(), table_tuple_insert(), TransitionCaptureState::tcs_original_insert_tuple, ThereAreNoPriorRegisteredSnapshots(), ThereAreNoReadyPortals(), CopyFromStateData::transition_capture, TriggerDesc::trig_insert_before_row, TriggerDesc::trig_insert_instead_row, TriggerDesc::trig_insert_new_table, RelationData::trigdesc, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tableOid, TupleTableSlot::tts_values, CopyFromStateData::volatile_defexprs, and CopyFromStateData::whereClause.

Referenced by copy_table(), and DoCopy().

525 {
526  ResultRelInfo *resultRelInfo;
527  ResultRelInfo *target_resultRelInfo;
528  ResultRelInfo *prevResultRelInfo = NULL;
529  EState *estate = CreateExecutorState(); /* for ExecConstraints() */
530  ModifyTableState *mtstate;
531  ExprContext *econtext;
532  TupleTableSlot *singleslot = NULL;
533  MemoryContext oldcontext = CurrentMemoryContext;
534 
535  PartitionTupleRouting *proute = NULL;
536  ErrorContextCallback errcallback;
537  CommandId mycid = GetCurrentCommandId(true);
538  int ti_options = 0; /* start with default options for insert */
539  BulkInsertState bistate = NULL;
540  CopyInsertMethod insertMethod;
541  CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
542  uint64 processed = 0;
543  bool has_before_insert_row_trig;
544  bool has_instead_insert_row_trig;
545  bool leafpart_use_multi_insert = false;
546 
547  Assert(cstate->rel);
548  Assert(list_length(cstate->range_table) == 1);
549 
550  /*
551  * The target must be a plain, foreign, or partitioned relation, or have
552  * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
553  * allowed on views, so we only hint about them in the view case.)
554  */
555  if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
556  cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
557  cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
558  !(cstate->rel->trigdesc &&
560  {
561  if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
562  ereport(ERROR,
563  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
564  errmsg("cannot copy to view \"%s\"",
565  RelationGetRelationName(cstate->rel)),
566  errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
567  else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
568  ereport(ERROR,
569  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
570  errmsg("cannot copy to materialized view \"%s\"",
571  RelationGetRelationName(cstate->rel))));
572  else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
573  ereport(ERROR,
574  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
575  errmsg("cannot copy to sequence \"%s\"",
576  RelationGetRelationName(cstate->rel))));
577  else
578  ereport(ERROR,
579  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
580  errmsg("cannot copy to non-table relation \"%s\"",
581  RelationGetRelationName(cstate->rel))));
582  }
583 
584  /*
585  * If the target file is new-in-transaction, we assume that checking FSM
586  * for free space is a waste of time. This could possibly be wrong, but
587  * it's unlikely.
588  */
589  if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
592  ti_options |= TABLE_INSERT_SKIP_FSM;
593 
594  /*
595  * Optimize if new relfilenode was created in this subxact or one of its
596  * committed children and we won't see those rows later as part of an
597  * earlier scan or command. The subxact test ensures that if this subxact
598  * aborts then the frozen rows won't be visible after xact cleanup. Note
599  * that the stronger test of exactly which subtransaction created it is
600  * crucial for correctness of this optimization. The test for an earlier
601  * scan or command tolerates false negatives. FREEZE causes other sessions
602  * to see rows they would not see under MVCC, and a false negative merely
603  * spreads that anomaly to the current session.
604  */
605  if (cstate->opts.freeze)
606  {
607  /*
608  * We currently disallow COPY FREEZE on partitioned tables. The
609  * reason for this is that we've simply not yet opened the partitions
610  * to determine if the optimization can be applied to them. We could
611  * go and open them all here, but doing so may be quite a costly
612  * overhead for small copies. In any case, we may just end up routing
613  * tuples to a small number of partitions. It seems better just to
614  * raise an ERROR for partitioned tables.
615  */
616  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
617  {
618  ereport(ERROR,
619  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
620  errmsg("cannot perform COPY FREEZE on a partitioned table")));
621  }
622 
623  /*
624  * Tolerate one registration for the benefit of FirstXactSnapshot.
625  * Scan-bearing queries generally create at least two registrations,
626  * though relying on that is fragile, as is ignoring ActiveSnapshot.
627  * Clear CatalogSnapshot to avoid counting its registration. We'll
628  * still detect ongoing catalog scans, each of which separately
629  * registers the snapshot it uses.
630  */
633  ereport(ERROR,
634  (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
635  errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
636 
637  if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
639  ereport(ERROR,
640  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
641  errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
642 
643  ti_options |= TABLE_INSERT_FROZEN;
644  }
645 
646  /*
647  * We need a ResultRelInfo so we can use the regular executor's
648  * index-entry-making machinery. (There used to be a huge amount of code
649  * here that basically duplicated execUtils.c ...)
650  */
651  ExecInitRangeTable(estate, cstate->range_table);
652  resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
653  ExecInitResultRelation(estate, resultRelInfo, 1);
654 
655  /* Verify the named relation is a valid target for INSERT */
656  CheckValidResultRel(resultRelInfo, CMD_INSERT);
657 
658  ExecOpenIndices(resultRelInfo, false);
659 
660  /*
661  * Set up a ModifyTableState so we can let FDW(s) init themselves for
662  * foreign-table result relation(s).
663  */
664  mtstate = makeNode(ModifyTableState);
665  mtstate->ps.plan = NULL;
666  mtstate->ps.state = estate;
667  mtstate->operation = CMD_INSERT;
668  mtstate->resultRelInfo = resultRelInfo;
669 
670  if (resultRelInfo->ri_FdwRoutine != NULL &&
671  resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
672  resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
673  resultRelInfo);
674 
675  /* Prepare to catch AFTER triggers. */
677 
678  /*
679  * If there are any triggers with transition tables on the named relation,
680  * we need to be prepared to capture transition tuples.
681  *
682  * Because partition tuple routing would like to know about whether
683  * transition capture is active, we also set it in mtstate, which is
684  * passed to ExecFindPartition() below.
685  */
686  cstate->transition_capture = mtstate->mt_transition_capture =
688  RelationGetRelid(cstate->rel),
689  CMD_INSERT);
690 
691  /*
692  * If the named relation is a partitioned table, initialize state for
693  * CopyFrom tuple routing.
694  */
695  if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
696  proute = ExecSetupPartitionTupleRouting(estate, NULL, cstate->rel);
697 
698  if (cstate->whereClause)
699  cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
700  &mtstate->ps);
701 
702  /*
703  * It's generally more efficient to prepare a bunch of tuples for
704  * insertion, and insert them in one table_multi_insert() call, than call
705  * table_tuple_insert() separately for every tuple. However, there are a
706  * number of reasons why we might not be able to do this. These are
707  * explained below.
708  */
709  if (resultRelInfo->ri_TrigDesc != NULL &&
710  (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
711  resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
712  {
713  /*
714  * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
715  * triggers on the table. Such triggers might query the table we're
716  * inserting into and act differently if the tuples that have already
717  * been processed and prepared for insertion are not there.
718  */
719  insertMethod = CIM_SINGLE;
720  }
721  else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
722  resultRelInfo->ri_TrigDesc->trig_insert_new_table)
723  {
724  /*
725  * For partitioned tables we can't support multi-inserts when there
726  * are any statement level insert triggers. It might be possible to
727  * allow partitioned tables with such triggers in the future, but for
728  * now, CopyMultiInsertInfoFlush expects that any before row insert
729  * and statement level insert triggers are on the same relation.
730  */
731  insertMethod = CIM_SINGLE;
732  }
733  else if (resultRelInfo->ri_FdwRoutine != NULL ||
734  cstate->volatile_defexprs)
735  {
736  /*
737  * Can't support multi-inserts to foreign tables or if there are any
738  * volatile default expressions in the table. Similarly to the
739  * trigger case above, such expressions may query the table we're
740  * inserting into.
741  *
742  * Note: It does not matter if any partitions have any volatile
743  * default expressions as we use the defaults from the target of the
744  * COPY command.
745  */
746  insertMethod = CIM_SINGLE;
747  }
748  else if (contain_volatile_functions(cstate->whereClause))
749  {
750  /*
751  * Can't support multi-inserts if there are any volatile function
752  * expressions in WHERE clause. Similarly to the trigger case above,
753  * such expressions may query the table we're inserting into.
754  */
755  insertMethod = CIM_SINGLE;
756  }
757  else
758  {
759  /*
760  * For partitioned tables, we may still be able to perform bulk
761  * inserts. However, the possibility of this depends on which types
762  * of triggers exist on the partition. We must disable bulk inserts
763  * if the partition is a foreign table or it has any before row insert
764  * or insert instead triggers (same as we checked above for the parent
765  * table). Since the partition's resultRelInfos are initialized only
766  * when we actually need to insert the first tuple into them, we must
767  * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
768  * flag that we must later determine if we can use bulk-inserts for
769  * the partition being inserted into.
770  */
771  if (proute)
772  insertMethod = CIM_MULTI_CONDITIONAL;
773  else
774  insertMethod = CIM_MULTI;
775 
776  CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
777  estate, mycid, ti_options);
778  }
779 
780  /*
781  * If not using batch mode (which allocates slots as needed) set up a
782  * tuple slot too. When inserting into a partitioned table, we also need
783  * one, even if we might batch insert, to read the tuple in the root
784  * partition's form.
785  */
786  if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
787  {
788  singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
789  &estate->es_tupleTable);
790  bistate = GetBulkInsertState();
791  }
792 
793  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
794  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
795 
796  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
797  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
798 
799  /*
800  * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
801  * should do this for COPY, since it's not really an "INSERT" statement as
802  * such. However, executing these triggers maintains consistency with the
803  * EACH ROW triggers that we already fire on COPY.
804  */
805  ExecBSInsertTriggers(estate, resultRelInfo);
806 
807  econtext = GetPerTupleExprContext(estate);
808 
809  /* Set up callback to identify error line number */
810  errcallback.callback = CopyFromErrorCallback;
811  errcallback.arg = (void *) cstate;
812  errcallback.previous = error_context_stack;
813  error_context_stack = &errcallback;
814 
815  for (;;)
816  {
817  TupleTableSlot *myslot;
818  bool skip_tuple;
819 
821 
822  /*
823  * Reset the per-tuple exprcontext. We do this after every tuple, to
824  * clean-up after expression evaluations etc.
825  */
826  ResetPerTupleExprContext(estate);
827 
828  /* select slot to (initially) load row into */
829  if (insertMethod == CIM_SINGLE || proute)
830  {
831  myslot = singleslot;
832  Assert(myslot != NULL);
833  }
834  else
835  {
836  Assert(resultRelInfo == target_resultRelInfo);
837  Assert(insertMethod == CIM_MULTI);
838 
839  myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
840  resultRelInfo);
841  }
842 
843  /*
844  * Switch to per-tuple context before calling NextCopyFrom, which does
845  * evaluate default expressions etc. and requires per-tuple context.
846  */
848 
849  ExecClearTuple(myslot);
850 
851  /* Directly store the values/nulls array in the slot */
852  if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
853  break;
854 
855  ExecStoreVirtualTuple(myslot);
856 
857  /*
858  * Constraints and where clause might reference the tableoid column,
859  * so (re-)initialize tts_tableOid before evaluating them.
860  */
861  myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
862 
863  /* Triggers and stuff need to be invoked in query context. */
864  MemoryContextSwitchTo(oldcontext);
865 
866  if (cstate->whereClause)
867  {
868  econtext->ecxt_scantuple = myslot;
869  /* Skip items that don't match COPY's WHERE clause */
870  if (!ExecQual(cstate->qualexpr, econtext))
871  continue;
872  }
873 
874  /* Determine the partition to insert the tuple into */
875  if (proute)
876  {
877  TupleConversionMap *map;
878 
879  /*
880  * Attempt to find a partition suitable for this tuple.
881  * ExecFindPartition() will raise an error if none can be found or
882  * if the found partition is not suitable for INSERTs.
883  */
884  resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
885  proute, myslot, estate);
886 
887  if (prevResultRelInfo != resultRelInfo)
888  {
889  /* Determine which triggers exist on this partition */
890  has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
891  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
892 
893  has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
894  resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
895 
896  /*
897  * Disable multi-inserts when the partition has BEFORE/INSTEAD
898  * OF triggers, or if the partition is a foreign partition.
899  */
900  leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
901  !has_before_insert_row_trig &&
902  !has_instead_insert_row_trig &&
903  resultRelInfo->ri_FdwRoutine == NULL;
904 
905  /* Set the multi-insert buffer to use for this partition. */
906  if (leafpart_use_multi_insert)
907  {
908  if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
909  CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
910  resultRelInfo);
911  }
912  else if (insertMethod == CIM_MULTI_CONDITIONAL &&
913  !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
914  {
915  /*
916  * Flush pending inserts if this partition can't use
917  * batching, so rows are visible to triggers etc.
918  */
919  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
920  }
921 
922  if (bistate != NULL)
923  ReleaseBulkInsertStatePin(bistate);
924  prevResultRelInfo = resultRelInfo;
925  }
926 
927  /*
928  * If we're capturing transition tuples, we might need to convert
929  * from the partition rowtype to root rowtype. But if there are no
930  * BEFORE triggers on the partition that could change the tuple,
931  * we can just remember the original unconverted tuple to avoid a
932  * needless round trip conversion.
933  */
934  if (cstate->transition_capture != NULL)
936  !has_before_insert_row_trig ? myslot : NULL;
937 
938  /*
939  * We might need to convert from the root rowtype to the partition
940  * rowtype.
941  */
942  map = resultRelInfo->ri_RootToPartitionMap;
943  if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
944  {
945  /* non batch insert */
946  if (map != NULL)
947  {
948  TupleTableSlot *new_slot;
949 
950  new_slot = resultRelInfo->ri_PartitionTupleSlot;
951  myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
952  }
953  }
954  else
955  {
956  /*
957  * Prepare to queue up tuple for later batch insert into
958  * current partition.
959  */
960  TupleTableSlot *batchslot;
961 
962  /* no other path available for partitioned table */
963  Assert(insertMethod == CIM_MULTI_CONDITIONAL);
964 
965  batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
966  resultRelInfo);
967 
968  if (map != NULL)
969  myslot = execute_attr_map_slot(map->attrMap, myslot,
970  batchslot);
971  else
972  {
973  /*
974  * This looks more expensive than it is (Believe me, I
975  * optimized it away. Twice.). The input is in virtual
976  * form, and we'll materialize the slot below - for most
977  * slot types the copy performs the work materialization
978  * would later require anyway.
979  */
980  ExecCopySlot(batchslot, myslot);
981  myslot = batchslot;
982  }
983  }
984 
985  /* ensure that triggers etc see the right relation */
986  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
987  }
988 
989  skip_tuple = false;
990 
991  /* BEFORE ROW INSERT Triggers */
992  if (has_before_insert_row_trig)
993  {
994  if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
995  skip_tuple = true; /* "do nothing" */
996  }
997 
998  if (!skip_tuple)
999  {
1000  /*
1001  * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1002  * tuple. Otherwise, proceed with inserting the tuple into the
1003  * table or foreign table.
1004  */
1005  if (has_instead_insert_row_trig)
1006  {
1007  ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1008  }
1009  else
1010  {
1011  /* Compute stored generated columns */
1012  if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1014  ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1015  CMD_INSERT);
1016 
1017  /*
1018  * If the target is a plain table, check the constraints of
1019  * the tuple.
1020  */
1021  if (resultRelInfo->ri_FdwRoutine == NULL &&
1022  resultRelInfo->ri_RelationDesc->rd_att->constr)
1023  ExecConstraints(resultRelInfo, myslot, estate);
1024 
1025  /*
1026  * Also check the tuple against the partition constraint, if
1027  * there is one; except that if we got here via tuple-routing,
1028  * we don't need to if there's no BR trigger defined on the
1029  * partition.
1030  */
1031  if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1032  (proute == NULL || has_before_insert_row_trig))
1033  ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1034 
1035  /* Store the slot in the multi-insert buffer, when enabled. */
1036  if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1037  {
1038  /*
1039  * The slot previously might point into the per-tuple
1040  * context. For batching it needs to be longer lived.
1041  */
1042  ExecMaterializeSlot(myslot);
1043 
1044  /* Add this tuple to the tuple buffer */
1045  CopyMultiInsertInfoStore(&multiInsertInfo,
1046  resultRelInfo, myslot,
1047  cstate->line_buf.len,
1048  cstate->cur_lineno);
1049 
1050  /*
1051  * If enough inserts have queued up, then flush all
1052  * buffers out to their tables.
1053  */
1054  if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1055  CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
1056  }
1057  else
1058  {
1059  List *recheckIndexes = NIL;
1060 
1061  /* OK, store the tuple */
1062  if (resultRelInfo->ri_FdwRoutine != NULL)
1063  {
1064  myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1065  resultRelInfo,
1066  myslot,
1067  NULL);
1068 
1069  if (myslot == NULL) /* "do nothing" */
1070  continue; /* next tuple please */
1071 
1072  /*
1073  * AFTER ROW Triggers might reference the tableoid
1074  * column, so (re-)initialize tts_tableOid before
1075  * evaluating them.
1076  */
1077  myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1078  }
1079  else
1080  {
1081  /* OK, store the tuple and create index entries for it */
1082  table_tuple_insert(resultRelInfo->ri_RelationDesc,
1083  myslot, mycid, ti_options, bistate);
1084 
1085  if (resultRelInfo->ri_NumIndices > 0)
1086  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1087  myslot,
1088  estate,
1089  false,
1090  false,
1091  NULL,
1092  NIL);
1093  }
1094 
1095  /* AFTER ROW INSERT Triggers */
1096  ExecARInsertTriggers(estate, resultRelInfo, myslot,
1097  recheckIndexes, cstate->transition_capture);
1098 
1099  list_free(recheckIndexes);
1100  }
1101  }
1102 
1103  /*
1104  * We count only tuples not suppressed by a BEFORE INSERT trigger
1105  * or FDW; this is the same definition used by nodeModifyTable.c
1106  * for counting tuples inserted by an INSERT command. Update
1107  * progress of the COPY command as well.
1108  */
1110  }
1111  }
1112 
1113  /* Flush any remaining buffered tuples */
1114  if (insertMethod != CIM_SINGLE)
1115  {
1116  if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1117  CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
1118  }
1119 
1120  /* Done, clean up */
1121  error_context_stack = errcallback.previous;
1122 
1123  if (bistate != NULL)
1124  FreeBulkInsertState(bistate);
1125 
1126  MemoryContextSwitchTo(oldcontext);
1127 
1128  /*
1129  * In the old protocol, tell pqcomm that we can process normal protocol
1130  * messages again.
1131  */
1132  if (cstate->copy_src == COPY_OLD_FE)
1133  pq_endmsgread();
1134 
1135  /* Execute AFTER STATEMENT insertion triggers */
1136  ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1137 
1138  /* Handle queued AFTER triggers */
1139  AfterTriggerEndQuery(estate);
1140 
1141  ExecResetTupleTable(estate->es_tupleTable, false);
1142 
1143  /* Allow the FDW to shut down */
1144  if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1145  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1146  target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1147  target_resultRelInfo);
1148 
1149  /* Tear down the multi-insert buffer data */
1150  if (insertMethod != CIM_SINGLE)
1151  CopyMultiInsertInfoCleanup(&multiInsertInfo);
1152 
1153  /* Close all the partitioned tables, leaf partitions, and their indices */
1154  if (proute)
1155  ExecCleanupTupleRouting(mtstate, proute);
1156 
1157  /* Close the result relations, including any trigger target relations */
1158  ExecCloseResultRelations(estate);
1160 
1161  FreeExecutorState(estate);
1162 
1163  return processed;
1164 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
int ri_NumIndices
Definition: execnodes.h:415
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
#define NIL
Definition: pg_list.h:65
Oid tts_tableOid
Definition: tuptable.h:131
uint32 CommandId
Definition: c.h:589
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:750
Relation ri_RelationDesc
Definition: execnodes.h:412
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:482
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:463
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errhint(const char *fmt,...)
Definition: elog.c:1162
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2320
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:508
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define ResetPerTupleExprContext(estate)
Definition: executor.h:518
#define TABLE_INSERT_FROZEN
Definition: tableam.h:234
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
Definition: execUtils.c:833
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:225
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1163
void ExecCloseResultRelations(EState *estate)
Definition: execMain.c:1430
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:219
void pgstat_progress_update_param(int index, int64 val)
Definition: pgstat.c:3478
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1801
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
StringInfoData line_buf
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1604
int errcode(int sqlerrcode)
Definition: elog.c:704
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:288
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
CmdType operation
Definition: execnodes.h:1155
SubTransactionId rd_newRelfilenodeSubid
Definition: rel.h:104
Datum * tts_values
Definition: tuptable.h:126
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:437
EState * state
Definition: execnodes.h:936
Form_pg_class rd_rel
Definition: rel.h:110
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:372
void(* callback)(void *arg)
Definition: elog.h:243
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:293
struct ErrorContextCallback * previous
Definition: elog.h:242
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
Definition: copyfrom.c:417
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1284
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:209
void ExecCloseRangeTableRelations(EState *estate)
Definition: execMain.c:1473
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:165
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:231
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copyfrom.c:251
ErrorContextCallback * error_context_stack
Definition: elog.c:93
bool trig_insert_instead_row
Definition: reltrigger.h:58
void FreeExecutorState(EState *estate)
Definition: execUtils.c:185
#define GetPerTupleExprContext(estate)
Definition: executor.h:509
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1824
bool trig_insert_new_table
Definition: reltrigger.h:75
bool has_generated_stored
Definition: tupdesc.h:45
void CopyFromErrorCallback(void *arg)
Definition: copyfrom.c:108
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1209
#define ERROR
Definition: elog.h:45
PlanState ps
Definition: execnodes.h:1154
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2244
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:498
TriggerDesc * trigdesc
Definition: rel.h:116
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
Definition: execMain.c:995
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2182
struct TransitionCaptureState * mt_transition_capture
Definition: execnodes.h:1185
bool * tts_isnull
Definition: tuptable.h:128
TupleConstr * constr
Definition: tupdesc.h:85
#define RelationGetRelationName(relation)
Definition: rel.h:491
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:441
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
AttrMap * attrMap
Definition: tupconvert.h:27
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:456
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:424
EState * CreateExecutorState(void)
Definition: execUtils.c:89
TransitionCaptureState * transition_capture
SubTransactionId rd_createSubid
Definition: rel.h:103
bool trig_insert_before_row
Definition: reltrigger.h:56
SubTransactionId rd_firstRelfilenodeSubid
Definition: rel.h:106
List * es_tupleTable
Definition: execnodes.h:567
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1161
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2233
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4378
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:233
TupleDesc rd_att
Definition: rel.h:111
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:443
Plan * plan
Definition: execnodes.h:934
void pq_endmsgread(void)
Definition: pqcomm.c:1233
#define ereport(elevel,...)
Definition: elog.h:155
void AfterTriggerBeginQuery(void)
Definition: trigger.c:4500
#define makeNode(_type_)
Definition: nodes.h:576
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177
CopyInsertMethod
#define Assert(condition)
Definition: c.h:792
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:1838
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:723
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2335
static int list_length(const List *l)
Definition: pg_list.h:149
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:225
#define InvalidSubTransactionId
Definition: c.h:581
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:1850
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
CopyFormatOptions opts
bool freeze
Definition: copy.h:33
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:4520
TupleConversionMap * ri_RootToPartitionMap
Definition: execnodes.h:497
int errmsg(const char *fmt,...)
Definition: elog.c:915
void list_free(List *list)
Definition: list.c:1391
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:276
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1679
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copyfrom.c:501
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:761
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:457
#define PROGRESS_COPY_LINES_PROCESSED
Definition: progress.h:139
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1522
EndForeignInsert_function EndForeignInsert
Definition: fdwapi.h:226
TupleTableSlot * tcs_original_insert_tuple
Definition: trigger.h:75

◆ CopyFromErrorCallback()

void CopyFromErrorCallback ( void *  arg)

Definition at line 108 of file copyfrom.c.

References CopyFormatOptions::binary, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, StringInfoData::data, errcontext, limit_printout_length(), CopyFromStateData::line_buf, CopyFromStateData::line_buf_converted, CopyFromStateData::line_buf_valid, CopyFromStateData::need_transcoding, CopyFromStateData::opts, pfree(), snprintf, and UINT64_FORMAT.

Referenced by CopyFrom(), file_acquire_sample_rows(), and fileIterateForeignScan().

109 {
110  CopyFromState cstate = (CopyFromState) arg;
111  char curlineno_str[32];
112 
113  snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
114  cstate->cur_lineno);
115 
116  if (cstate->opts.binary)
117  {
118  /* can't usefully display the data */
119  if (cstate->cur_attname)
120  errcontext("COPY %s, line %s, column %s",
121  cstate->cur_relname, curlineno_str,
122  cstate->cur_attname);
123  else
124  errcontext("COPY %s, line %s",
125  cstate->cur_relname, curlineno_str);
126  }
127  else
128  {
129  if (cstate->cur_attname && cstate->cur_attval)
130  {
131  /* error is relevant to a particular column */
132  char *attval;
133 
134  attval = limit_printout_length(cstate->cur_attval);
135  errcontext("COPY %s, line %s, column %s: \"%s\"",
136  cstate->cur_relname, curlineno_str,
137  cstate->cur_attname, attval);
138  pfree(attval);
139  }
140  else if (cstate->cur_attname)
141  {
142  /* error is relevant to a particular column, value is NULL */
143  errcontext("COPY %s, line %s, column %s: null input",
144  cstate->cur_relname, curlineno_str,
145  cstate->cur_attname);
146  }
147  else
148  {
149  /*
150  * Error is relevant to a particular line.
151  *
152  * If line_buf still contains the correct line, and it's already
153  * transcoded, print it. If it's still in a foreign encoding, it's
154  * quite likely that the error is precisely a failure to do
155  * encoding conversion (ie, bad data). We dare not try to convert
156  * it, and at present there's no way to regurgitate it without
157  * conversion. So we have to punt and just report the line number.
158  */
159  if (cstate->line_buf_valid &&
160  (cstate->line_buf_converted || !cstate->need_transcoding))
161  {
162  char *lineval;
163 
164  lineval = limit_printout_length(cstate->line_buf.data);
165  errcontext("COPY %s, line %s: \"%s\"",
166  cstate->cur_relname, curlineno_str, lineval);
167  pfree(lineval);
168  }
169  else
170  {
171  errcontext("COPY %s, line %s",
172  cstate->cur_relname, curlineno_str);
173  }
174  }
175  }
176 }
const char * cur_attname
StringInfoData line_buf
const char * cur_attval
static char * limit_printout_length(const char *str)
Definition: copyfrom.c:184
void pfree(void *pointer)
Definition: mcxt.c:1057
bool binary
Definition: copy.h:32
const char * cur_relname
struct CopyFromStateData * CopyFromState
Definition: copy.h:54
CopyFormatOptions opts
#define errcontext
Definition: elog.h:199
void * arg
#define snprintf
Definition: port.h:215
#define UINT64_FORMAT
Definition: c.h:472

◆ CopyMultiInsertBufferCleanup()

static void CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer 
)
inlinestatic

Definition at line 383 of file copyfrom.c.

References Assert, CopyMultiInsertBuffer::bistate, ExecDropSingleTupleTableSlot(), FreeBulkInsertState(), i, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, pfree(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, table_finish_bulk_insert(), and CopyMultiInsertInfo::ti_options.

Referenced by CopyMultiInsertInfoCleanup(), and CopyMultiInsertInfoFlush().

385 {
386  int i;
387 
388  /* Ensure buffer was flushed */
389  Assert(buffer->nused == 0);
390 
391  /* Remove back-link to ourself */
392  buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
393 
394  FreeBulkInsertState(buffer->bistate);
395 
396  /* Since we only create slots on demand, just drop the non-null ones. */
397  for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
399 
401  miinfo->ti_options);
402 
403  pfree(buffer);
404 }
Relation ri_RelationDesc
Definition: execnodes.h:412
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:508
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1477
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:73
void pfree(void *pointer)
Definition: mcxt.c:1057
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
BulkInsertState bistate
Definition: copyfrom.c:74
#define Assert(condition)
Definition: c.h:792
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:1838
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:72
int i
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:58

◆ CopyMultiInsertBufferFlush()

static void CopyMultiInsertBufferFlush ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer 
)
inlinestatic

Definition at line 297 of file copyfrom.c.

References CopyMultiInsertBuffer::bistate, CopyMultiInsertInfo::cstate, CopyFromStateData::cur_lineno, CopyMultiInsertInfo::estate, ExecARInsertTriggers(), ExecClearTuple(), ExecInsertIndexTuples(), GetPerTupleMemoryContext, i, CopyFromStateData::line_buf_valid, CopyMultiInsertBuffer::linenos, list_free(), MemoryContextSwitchTo(), CopyMultiInsertInfo::mycid, NIL, CopyMultiInsertBuffer::nused, CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, CopyMultiInsertBuffer::slots, table_multi_insert(), CopyMultiInsertInfo::ti_options, CopyFromStateData::transition_capture, TriggerDesc::trig_insert_after_row, and TriggerDesc::trig_insert_new_table.

Referenced by CopyMultiInsertInfoFlush().

299 {
300  MemoryContext oldcontext;
301  int i;
302  uint64 save_cur_lineno;
303  CopyFromState cstate = miinfo->cstate;
304  EState *estate = miinfo->estate;
305  CommandId mycid = miinfo->mycid;
306  int ti_options = miinfo->ti_options;
307  bool line_buf_valid = cstate->line_buf_valid;
308  int nused = buffer->nused;
309  ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
310  TupleTableSlot **slots = buffer->slots;
311 
312  /*
313  * Print error context information correctly, if one of the operations
314  * below fail.
315  */
316  cstate->line_buf_valid = false;
317  save_cur_lineno = cstate->cur_lineno;
318 
319  /*
320  * table_multi_insert may leak memory, so switch to short-lived memory
321  * context before calling it.
322  */
323  oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
324  table_multi_insert(resultRelInfo->ri_RelationDesc,
325  slots,
326  nused,
327  mycid,
328  ti_options,
329  buffer->bistate);
330  MemoryContextSwitchTo(oldcontext);
331 
332  for (i = 0; i < nused; i++)
333  {
334  /*
335  * If there are any indexes, update them for all the inserted tuples,
336  * and run AFTER ROW INSERT triggers.
337  */
338  if (resultRelInfo->ri_NumIndices > 0)
339  {
340  List *recheckIndexes;
341 
342  cstate->cur_lineno = buffer->linenos[i];
343  recheckIndexes =
344  ExecInsertIndexTuples(resultRelInfo,
345  buffer->slots[i], estate, false, false,
346  NULL, NIL);
347  ExecARInsertTriggers(estate, resultRelInfo,
348  slots[i], recheckIndexes,
349  cstate->transition_capture);
350  list_free(recheckIndexes);
351  }
352 
353  /*
354  * There's no indexes, but see if we need to run AFTER ROW INSERT
355  * triggers anyway.
356  */
357  else if (resultRelInfo->ri_TrigDesc != NULL &&
358  (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
359  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
360  {
361  cstate->cur_lineno = buffer->linenos[i];
362  ExecARInsertTriggers(estate, resultRelInfo,
363  slots[i], NIL, cstate->transition_capture);
364  }
365 
366  ExecClearTuple(slots[i]);
367  }
368 
369  /* Mark that all slots are free */
370  buffer->nused = 0;
371 
372  /* reset cur_lineno and line_buf_valid to what they were */
373  cstate->line_buf_valid = line_buf_valid;
374  cstate->cur_lineno = save_cur_lineno;
375 }
int ri_NumIndices
Definition: execnodes.h:415
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:589
Relation ri_RelationDesc
Definition: execnodes.h:412
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2320
EState * estate
Definition: copyfrom.c:91
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
CommandId mycid
Definition: copyfrom.c:92
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:76
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
CopyFromState cstate
Definition: copyfrom.c:90
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:293
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:73
bool trig_insert_new_table
Definition: reltrigger.h:75
bool trig_insert_after_row
Definition: reltrigger.h:57
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1339
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:424
TransitionCaptureState * transition_capture
BulkInsertState bistate
Definition: copyfrom.c:74
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:72
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:514
void list_free(List *list)
Definition: list.c:1391
int i
Definition: pg_list.h:50

◆ CopyMultiInsertBufferInit()

static CopyMultiInsertBuffer* CopyMultiInsertBufferInit ( ResultRelInfo rri)
static

Definition at line 214 of file copyfrom.c.

References CopyMultiInsertBuffer::bistate, GetBulkInsertState(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, palloc(), CopyMultiInsertBuffer::resultRelInfo, and CopyMultiInsertBuffer::slots.

Referenced by CopyMultiInsertInfoSetupBuffer().

215 {
216  CopyMultiInsertBuffer *buffer;
217 
218  buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
219  memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
220  buffer->resultRelInfo = rri;
221  buffer->bistate = GetBulkInsertState();
222  buffer->nused = 0;
223 
224  return buffer;
225 }
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:73
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:1824
BulkInsertState bistate
Definition: copyfrom.c:74
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:72
void * palloc(Size size)
Definition: mcxt.c:950
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:58

◆ CopyMultiInsertInfoCleanup()

static void CopyMultiInsertInfoCleanup ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 463 of file copyfrom.c.

References CopyMultiInsertBufferCleanup(), lfirst, list_free(), and CopyMultiInsertInfo::multiInsertBuffers.

Referenced by CopyFrom().

464 {
465  ListCell *lc;
466 
467  foreach(lc, miinfo->multiInsertBuffers)
469 
470  list_free(miinfo->multiInsertBuffers);
471 }
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:383
#define lfirst(lc)
Definition: pg_list.h:169
void list_free(List *list)
Definition: list.c:1391
List * multiInsertBuffers
Definition: copyfrom.c:87

◆ CopyMultiInsertInfoFlush()

static void CopyMultiInsertInfoFlush ( CopyMultiInsertInfo miinfo,
ResultRelInfo curr_rri 
)
inlinestatic

Definition at line 417 of file copyfrom.c.

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBufferCleanup(), CopyMultiInsertBufferFlush(), lappend(), lfirst, linitial, list_delete_first(), list_length(), MAX_PARTITION_BUFFERS, CopyMultiInsertInfo::multiInsertBuffers, and CopyMultiInsertBuffer::resultRelInfo.

Referenced by CopyFrom().

418 {
419  ListCell *lc;
420 
421  foreach(lc, miinfo->multiInsertBuffers)
422  {
424 
425  CopyMultiInsertBufferFlush(miinfo, buffer);
426  }
427 
428  miinfo->bufferedTuples = 0;
429  miinfo->bufferedBytes = 0;
430 
431  /*
432  * Trim the list of tracked buffers down if it exceeds the limit. Here we
433  * remove buffers starting with the ones we created first. It seems less
434  * likely that these older ones will be needed than the ones that were
435  * just created.
436  */
438  {
439  CopyMultiInsertBuffer *buffer;
440 
441  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
442 
443  /*
444  * We never want to remove the buffer that's currently being used, so
445  * if we happen to find that then move it to the end of the list.
446  */
447  if (buffer->resultRelInfo == curr_rri)
448  {
450  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
451  buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
452  }
453 
454  CopyMultiInsertBufferCleanup(miinfo, buffer);
456  }
457 }
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:383
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:297
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:73
#define linitial(l)
Definition: pg_list.h:174
#define MAX_PARTITION_BUFFERS
Definition: copyfrom.c:67
List * lappend(List *list, void *datum)
Definition: list.c:336
#define lfirst(lc)
Definition: pg_list.h:169
static int list_length(const List *l)
Definition: pg_list.h:149
List * multiInsertBuffers
Definition: copyfrom.c:87
List * list_delete_first(List *list)
Definition: list.c:875

◆ CopyMultiInsertInfoInit()

static void CopyMultiInsertInfoInit ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
CopyFromState  cstate,
EState estate,
CommandId  mycid,
int  ti_options 
)
static

Definition at line 251 of file copyfrom.c.

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfo::cstate, CopyMultiInsertInfo::estate, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertInfo::mycid, NIL, RelationData::rd_rel, ResultRelInfo::ri_RelationDesc, and CopyMultiInsertInfo::ti_options.

Referenced by CopyFrom().

254 {
255  miinfo->multiInsertBuffers = NIL;
256  miinfo->bufferedTuples = 0;
257  miinfo->bufferedBytes = 0;
258  miinfo->cstate = cstate;
259  miinfo->estate = estate;
260  miinfo->mycid = mycid;
261  miinfo->ti_options = ti_options;
262 
263  /*
264  * Only setup the buffer when not dealing with a partitioned table.
265  * Buffers for partitioned tables will just be setup when we need to send
266  * tuples their way for the first time.
267  */
268  if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
269  CopyMultiInsertInfoSetupBuffer(miinfo, rri);
270 }
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:412
EState * estate
Definition: copyfrom.c:91
CommandId mycid
Definition: copyfrom.c:92
CopyFromState cstate
Definition: copyfrom.c:90
Form_pg_class rd_rel
Definition: rel.h:110
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:231
List * multiInsertBuffers
Definition: copyfrom.c:87

◆ CopyMultiInsertInfoIsEmpty()

static bool CopyMultiInsertInfoIsEmpty ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 288 of file copyfrom.c.

References CopyMultiInsertInfo::bufferedTuples.

Referenced by CopyFrom().

289 {
290  return miinfo->bufferedTuples == 0;
291 }

◆ CopyMultiInsertInfoIsFull()

static bool CopyMultiInsertInfoIsFull ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 276 of file copyfrom.c.

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, MAX_BUFFERED_BYTES, and MAX_BUFFERED_TUPLES.

Referenced by CopyFrom().

277 {
278  if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
280  return true;
281  return false;
282 }
#define MAX_BUFFERED_BYTES
Definition: copyfrom.c:64
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:58

◆ CopyMultiInsertInfoNextFreeSlot()

static TupleTableSlot* CopyMultiInsertInfoNextFreeSlot ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 482 of file copyfrom.c.

References Assert, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, and table_slot_create().

Referenced by CopyFrom().

484 {
486  int nused = buffer->nused;
487 
488  Assert(buffer != NULL);
489  Assert(nused < MAX_BUFFERED_TUPLES);
490 
491  if (buffer->slots[nused] == NULL)
492  buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
493  return buffer->slots[nused];
494 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
Relation ri_RelationDesc
Definition: execnodes.h:412
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:508
#define Assert(condition)
Definition: c.h:792
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:72
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:58

◆ CopyMultiInsertInfoSetupBuffer()

static void CopyMultiInsertInfoSetupBuffer ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 231 of file copyfrom.c.

References CopyMultiInsertBufferInit(), lappend(), CopyMultiInsertInfo::multiInsertBuffers, and ResultRelInfo::ri_CopyMultiInsertBuffer.

Referenced by CopyFrom(), and CopyMultiInsertInfoInit().

233 {
234  CopyMultiInsertBuffer *buffer;
235 
236  buffer = CopyMultiInsertBufferInit(rri);
237 
238  /* Setup back-link so we can easily find this buffer again */
239  rri->ri_CopyMultiInsertBuffer = buffer;
240  /* Record that we're tracking this buffer */
241  miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
242 }
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:508
List * lappend(List *list, void *datum)
Definition: list.c:336
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
Definition: copyfrom.c:214
List * multiInsertBuffers
Definition: copyfrom.c:87

◆ CopyMultiInsertInfoStore()

static void CopyMultiInsertInfoStore ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
TupleTableSlot slot,
int  tuplen,
uint64  lineno 
)
inlinestatic

Definition at line 501 of file copyfrom.c.

References Assert, CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBuffer::linenos, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, and CopyMultiInsertBuffer::slots.

Referenced by CopyFrom().

503 {
505 
506  Assert(buffer != NULL);
507  Assert(slot == buffer->slots[buffer->nused]);
508 
509  /* Store the line number so we can properly report any errors later */
510  buffer->linenos[buffer->nused] = lineno;
511 
512  /* Record this slot as being used */
513  buffer->nused++;
514 
515  /* Update how many tuples are stored and their size */
516  miinfo->bufferedTuples++;
517  miinfo->bufferedBytes += tuplen;
518 }
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:508
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:76
#define Assert(condition)
Definition: c.h:792
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:72

◆ EndCopyFrom()

void EndCopyFrom ( CopyFromState  cstate)

Definition at line 1521 of file copyfrom.c.

References ClosePipeFromProgram(), CopyFromStateData::copy_file, CopyFromStateData::copycontext, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyFromStateData::filename, FreeFile(), CopyFromStateData::is_program, MemoryContextDelete(), pfree(), and pgstat_progress_end_command().

Referenced by DoCopy(), file_acquire_sample_rows(), fileEndForeignScan(), and fileReScanForeignScan().

1522 {
1523  /* No COPY FROM related resources except memory. */
1524  if (cstate->is_program)
1525  {
1526  ClosePipeFromProgram(cstate);
1527  }
1528  else
1529  {
1530  if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1531  ereport(ERROR,
1533  errmsg("could not close file \"%s\": %m",
1534  cstate->filename)));
1535  }
1536 
1538 
1540  pfree(cstate);
1541 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
MemoryContext copycontext
void pfree(void *pointer)
Definition: mcxt.c:1057
#define ERROR
Definition: elog.h:45
static void ClosePipeFromProgram(CopyFromState cstate)
Definition: copyfrom.c:1547
int errcode_for_file_access(void)
Definition: elog.c:727
void pgstat_progress_end_command(void)
Definition: pgstat.c:3529
#define ereport(elevel,...)
Definition: elog.h:155
int FreeFile(FILE *file)
Definition: fd.c:2553
int errmsg(const char *fmt,...)
Definition: elog.c:915

◆ limit_printout_length()

static char * limit_printout_length ( const char *  str)
static

Definition at line 184 of file copyfrom.c.

References MAX_COPY_DATA_DISPLAY, palloc(), pg_mbcliplen(), and pstrdup().

Referenced by CopyFromErrorCallback().

185 {
186 #define MAX_COPY_DATA_DISPLAY 100
187 
188  int slen = strlen(str);
189  int len;
190  char *res;
191 
192  /* Fast path if definitely okay */
193  if (slen <= MAX_COPY_DATA_DISPLAY)
194  return pstrdup(str);
195 
196  /* Apply encoding-dependent truncation */
197  len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
198 
199  /*
200  * Truncate, and add "..." to show we truncated the input.
201  */
202  res = (char *) palloc(len + 4);
203  memcpy(res, str, len);
204  strcpy(res + len, "...");
205 
206  return res;
207 }
#define MAX_COPY_DATA_DISPLAY
char * pstrdup(const char *in)
Definition: mcxt.c:1187
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:967
void * palloc(Size size)
Definition: mcxt.c:950