PostgreSQL Source Code git master
copyfrom.c File Reference
#include "postgres.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/copyapi.h"
#include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "executor/tuptable.h"
#include "foreign/fdwapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/miscnodes.h"
#include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/portal.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
Include dependency graph for copyfrom.c:

Go to the source code of this file.

Data Structures

struct  CopyMultiInsertBuffer
 
struct  CopyMultiInsertInfo
 

Macros

#define MAX_BUFFERED_TUPLES   1000
 
#define MAX_BUFFERED_BYTES   65535
 
#define MAX_PARTITION_BUFFERS   32
 
#define MAX_COPY_DATA_DISPLAY   100
 

Typedefs

typedef struct CopyMultiInsertBuffer CopyMultiInsertBuffer
 
typedef struct CopyMultiInsertInfo CopyMultiInsertInfo
 

Functions

static void ClosePipeFromProgram (CopyFromState cstate)
 
static void CopyFromTextLikeInFunc (CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
 
static void CopyFromTextLikeStart (CopyFromState cstate, TupleDesc tupDesc)
 
static void CopyFromTextLikeEnd (CopyFromState cstate)
 
static void CopyFromBinaryInFunc (CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
 
static void CopyFromBinaryStart (CopyFromState cstate, TupleDesc tupDesc)
 
static void CopyFromBinaryEnd (CopyFromState cstate)
 
static const CopyFromRoutineCopyFromGetRoutine (const CopyFormatOptions *opts)
 
void CopyFromErrorCallback (void *arg)
 
char * CopyLimitPrintoutLength (const char *str)
 
static CopyMultiInsertBufferCopyMultiInsertBufferInit (ResultRelInfo *rri)
 
static void CopyMultiInsertInfoSetupBuffer (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoInit (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
 
static bool CopyMultiInsertInfoIsFull (CopyMultiInsertInfo *miinfo)
 
static bool CopyMultiInsertInfoIsEmpty (CopyMultiInsertInfo *miinfo)
 
static void CopyMultiInsertBufferFlush (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
 
static void CopyMultiInsertBufferCleanup (CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
 
static void CopyMultiInsertInfoFlush (CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
 
static void CopyMultiInsertInfoCleanup (CopyMultiInsertInfo *miinfo)
 
static TupleTableSlotCopyMultiInsertInfoNextFreeSlot (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
 
static void CopyMultiInsertInfoStore (CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
 
uint64 CopyFrom (CopyFromState cstate)
 
CopyFromState BeginCopyFrom (ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
 
void EndCopyFrom (CopyFromState cstate)
 

Variables

static const CopyFromRoutine CopyFromRoutineText
 
static const CopyFromRoutine CopyFromRoutineCSV
 
static const CopyFromRoutine CopyFromRoutineBinary
 

Macro Definition Documentation

◆ MAX_BUFFERED_BYTES

#define MAX_BUFFERED_BYTES   65535

Definition at line 69 of file copyfrom.c.

◆ MAX_BUFFERED_TUPLES

#define MAX_BUFFERED_TUPLES   1000

Definition at line 63 of file copyfrom.c.

◆ MAX_COPY_DATA_DISPLAY

#define MAX_COPY_DATA_DISPLAY   100

◆ MAX_PARTITION_BUFFERS

#define MAX_PARTITION_BUFFERS   32

Definition at line 75 of file copyfrom.c.

Typedef Documentation

◆ CopyMultiInsertBuffer

◆ CopyMultiInsertInfo

Function Documentation

◆ BeginCopyFrom()

CopyFromState BeginCopyFrom ( ParseState pstate,
Relation  rel,
Node whereClause,
const char *  filename,
bool  is_program,
copy_data_source_cb  data_source_cb,
List attnamelist,
List options 
)

Definition at line 1529 of file copyfrom.c.

1537{
1538 CopyFromState cstate;
1539 bool pipe = (filename == NULL);
1540 TupleDesc tupDesc;
1541 AttrNumber num_phys_attrs,
1542 num_defaults;
1543 FmgrInfo *in_functions;
1544 Oid *typioparams;
1545 int *defmap;
1546 ExprState **defexprs;
1547 MemoryContext oldcontext;
1548 bool volatile_defexprs;
1549 const int progress_cols[] = {
1553 };
1554 int64 progress_vals[] = {
1556 0,
1557 0
1558 };
1559
1560 /* Allocate workspace and zero all fields */
1561 cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1562
1563 /*
1564 * We allocate everything used by a cstate in a new memory context. This
1565 * avoids memory leaks during repeated use of COPY in a query.
1566 */
1568 "COPY",
1570
1571 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1572
1573 /* Extract options from the statement node tree */
1574 ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1575
1576 /* Set the format routine */
1577 cstate->routine = CopyFromGetRoutine(&cstate->opts);
1578
1579 /* Process the target relation */
1580 cstate->rel = rel;
1581
1582 tupDesc = RelationGetDescr(cstate->rel);
1583
1584 /* process common options or initialization */
1585
1586 /* Generate or convert list of attributes to process */
1587 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1588
1589 num_phys_attrs = tupDesc->natts;
1590
1591 /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1592 cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1593 if (cstate->opts.force_notnull_all)
1594 MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
1595 else if (cstate->opts.force_notnull)
1596 {
1597 List *attnums;
1598 ListCell *cur;
1599
1600 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1601
1602 foreach(cur, attnums)
1603 {
1604 int attnum = lfirst_int(cur);
1605 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1606
1607 if (!list_member_int(cstate->attnumlist, attnum))
1608 ereport(ERROR,
1609 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1610 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1611 errmsg("%s column \"%s\" not referenced by COPY",
1612 "FORCE_NOT_NULL", NameStr(attr->attname))));
1613 cstate->opts.force_notnull_flags[attnum - 1] = true;
1614 }
1615 }
1616
1617 /* Set up soft error handler for ON_ERROR */
1618 if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1619 {
1621 cstate->escontext->type = T_ErrorSaveContext;
1622 cstate->escontext->error_occurred = false;
1623
1624 /*
1625 * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1626 * options later
1627 */
1628 if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1629 cstate->escontext->details_wanted = false;
1630 }
1631 else
1632 cstate->escontext = NULL;
1633
1634 /* Convert FORCE_NULL name list to per-column flags, check validity */
1635 cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1636 if (cstate->opts.force_null_all)
1637 MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
1638 else if (cstate->opts.force_null)
1639 {
1640 List *attnums;
1641 ListCell *cur;
1642
1643 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1644
1645 foreach(cur, attnums)
1646 {
1647 int attnum = lfirst_int(cur);
1648 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1649
1650 if (!list_member_int(cstate->attnumlist, attnum))
1651 ereport(ERROR,
1652 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1653 /*- translator: first %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
1654 errmsg("%s column \"%s\" not referenced by COPY",
1655 "FORCE_NULL", NameStr(attr->attname))));
1656 cstate->opts.force_null_flags[attnum - 1] = true;
1657 }
1658 }
1659
1660 /* Convert convert_selectively name list to per-column flags */
1661 if (cstate->opts.convert_selectively)
1662 {
1663 List *attnums;
1664 ListCell *cur;
1665
1666 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1667
1668 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1669
1670 foreach(cur, attnums)
1671 {
1672 int attnum = lfirst_int(cur);
1673 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1674
1675 if (!list_member_int(cstate->attnumlist, attnum))
1676 ereport(ERROR,
1677 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1678 errmsg_internal("selected column \"%s\" not referenced by COPY",
1679 NameStr(attr->attname))));
1680 cstate->convert_select_flags[attnum - 1] = true;
1681 }
1682 }
1683
1684 /* Use client encoding when ENCODING option is not specified. */
1685 if (cstate->opts.file_encoding < 0)
1687 else
1688 cstate->file_encoding = cstate->opts.file_encoding;
1689
1690 /*
1691 * Look up encoding conversion function.
1692 */
1693 if (cstate->file_encoding == GetDatabaseEncoding() ||
1694 cstate->file_encoding == PG_SQL_ASCII ||
1696 {
1697 cstate->need_transcoding = false;
1698 }
1699 else
1700 {
1701 cstate->need_transcoding = true;
1704 if (!OidIsValid(cstate->conversion_proc))
1705 ereport(ERROR,
1706 (errcode(ERRCODE_UNDEFINED_FUNCTION),
1707 errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1710 }
1711
1712 cstate->copy_src = COPY_FILE; /* default */
1713
1714 cstate->whereClause = whereClause;
1715
1716 /* Initialize state variables */
1717 cstate->eol_type = EOL_UNKNOWN;
1718 cstate->cur_relname = RelationGetRelationName(cstate->rel);
1719 cstate->cur_lineno = 0;
1720 cstate->cur_attname = NULL;
1721 cstate->cur_attval = NULL;
1722 cstate->relname_only = false;
1723
1724 /*
1725 * Allocate buffers for the input pipeline.
1726 *
1727 * attribute_buf and raw_buf are used in both text and binary modes, but
1728 * input_buf and line_buf only in text mode.
1729 */
1730 cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1731 cstate->raw_buf_index = cstate->raw_buf_len = 0;
1732 cstate->raw_reached_eof = false;
1733
1734 initStringInfo(&cstate->attribute_buf);
1735
1736 /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1737 if (pstate)
1738 {
1739 cstate->range_table = pstate->p_rtable;
1740 cstate->rteperminfos = pstate->p_rteperminfos;
1741 }
1742
1743 num_defaults = 0;
1744 volatile_defexprs = false;
1745
1746 /*
1747 * Pick up the required catalog information for each attribute in the
1748 * relation, including the input function, the element type (to pass to
1749 * the input function), and info about defaults and constraints. (Which
1750 * input function we use depends on text/binary format choice.)
1751 */
1752 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1753 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1754 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1755 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1756
1757 for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1758 {
1759 Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1760
1761 /* We don't need info for dropped attributes */
1762 if (att->attisdropped)
1763 continue;
1764
1765 /* Fetch the input function and typioparam info */
1766 cstate->routine->CopyFromInFunc(cstate, att->atttypid,
1767 &in_functions[attnum - 1],
1768 &typioparams[attnum - 1]);
1769
1770 /* Get default info if available */
1771 defexprs[attnum - 1] = NULL;
1772
1773 /*
1774 * We only need the default values for columns that do not appear in
1775 * the column list, unless the DEFAULT option was given. We never need
1776 * default values for generated columns.
1777 */
1778 if ((cstate->opts.default_print != NULL ||
1779 !list_member_int(cstate->attnumlist, attnum)) &&
1780 !att->attgenerated)
1781 {
1782 Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1783 attnum);
1784
1785 if (defexpr != NULL)
1786 {
1787 /* Run the expression through planner */
1788 defexpr = expression_planner(defexpr);
1789
1790 /* Initialize executable expression in copycontext */
1791 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1792
1793 /* if NOT copied from input */
1794 /* use default value if one exists */
1795 if (!list_member_int(cstate->attnumlist, attnum))
1796 {
1797 defmap[num_defaults] = attnum - 1;
1798 num_defaults++;
1799 }
1800
1801 /*
1802 * If a default expression looks at the table being loaded,
1803 * then it could give the wrong answer when using
1804 * multi-insert. Since database access can be dynamic this is
1805 * hard to test for exactly, so we use the much wider test of
1806 * whether the default expression is volatile. We allow for
1807 * the special case of when the default expression is the
1808 * nextval() of a sequence which in this specific case is
1809 * known to be safe for use with the multi-insert
1810 * optimization. Hence we use this special case function
1811 * checker rather than the standard check for
1812 * contain_volatile_functions(). Note also that we already
1813 * ran the expression through expression_planner().
1814 */
1815 if (!volatile_defexprs)
1816 volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1817 }
1818 }
1819 }
1820
1821 cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1822
1823 /* initialize progress */
1825 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1826 cstate->bytes_processed = 0;
1827
1828 /* We keep those variables in cstate. */
1829 cstate->in_functions = in_functions;
1830 cstate->typioparams = typioparams;
1831 cstate->defmap = defmap;
1832 cstate->defexprs = defexprs;
1833 cstate->volatile_defexprs = volatile_defexprs;
1834 cstate->num_defaults = num_defaults;
1835 cstate->is_program = is_program;
1836
1837 if (data_source_cb)
1838 {
1839 progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1840 cstate->copy_src = COPY_CALLBACK;
1841 cstate->data_source_cb = data_source_cb;
1842 }
1843 else if (pipe)
1844 {
1845 progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1846 Assert(!is_program); /* the grammar does not allow this */
1848 ReceiveCopyBegin(cstate);
1849 else
1850 cstate->copy_file = stdin;
1851 }
1852 else
1853 {
1854 cstate->filename = pstrdup(filename);
1855
1856 if (cstate->is_program)
1857 {
1858 progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1859 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1860 if (cstate->copy_file == NULL)
1861 ereport(ERROR,
1863 errmsg("could not execute command \"%s\": %m",
1864 cstate->filename)));
1865 }
1866 else
1867 {
1868 struct stat st;
1869
1870 progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1871 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1872 if (cstate->copy_file == NULL)
1873 {
1874 /* copy errno because ereport subfunctions might change it */
1875 int save_errno = errno;
1876
1877 ereport(ERROR,
1879 errmsg("could not open file \"%s\" for reading: %m",
1880 cstate->filename),
1881 (save_errno == ENOENT || save_errno == EACCES) ?
1882 errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1883 "You may want a client-side facility such as psql's \\copy.") : 0));
1884 }
1885
1886 if (fstat(fileno(cstate->copy_file), &st))
1887 ereport(ERROR,
1889 errmsg("could not stat file \"%s\": %m",
1890 cstate->filename)));
1891
1892 if (S_ISDIR(st.st_mode))
1893 ereport(ERROR,
1894 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1895 errmsg("\"%s\" is a directory", cstate->filename)));
1896
1897 progress_vals[2] = st.st_size;
1898 }
1899 }
1900
1901 pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1902
1903 cstate->routine->CopyFromStart(cstate, tupDesc);
1904
1905 MemoryContextSwitchTo(oldcontext);
1906
1907 return cstate;
1908}
int16 AttrNumber
Definition: attnum.h:21
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
Definition: copy.c:945
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
Definition: copy.c:496
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
@ PROGRESS_COMMAND_COPY
#define NameStr(name)
Definition: c.h:717
#define PG_BINARY_R
Definition: c.h:1246
int64_t int64
Definition: c.h:499
#define MemSet(start, val, len)
Definition: c.h:991
#define OidIsValid(objectId)
Definition: c.h:746
bool contain_volatile_functions_not_nextval(Node *clause)
Definition: clauses.c:672
static const CopyFromRoutine * CopyFromGetRoutine(const CopyFormatOptions *opts)
Definition: copyfrom.c:156
@ EOL_UNKNOWN
#define RAW_BUF_SIZE
void ReceiveCopyBegin(CopyFromState cstate)
@ COPY_FILE
Definition: copyto.c:45
@ COPY_CALLBACK
Definition: copyto.c:47
@ DestRemote
Definition: dest.h:89
struct cursor * cur
Definition: ecpg.c:29
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode_for_file_access(void)
Definition: elog.c:876
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
Definition: execExpr.c:143
FILE * OpenPipeStream(const char *command, const char *mode)
Definition: fd.c:2708
FILE * AllocateFile(const char *name, const char *mode)
Definition: fd.c:2605
Assert(PointerIsAligned(start, uint64))
@ COPY_ON_ERROR_IGNORE
Definition: copy.h:40
@ COPY_ON_ERROR_STOP
Definition: copy.h:39
bool list_member_int(const List *list, int datum)
Definition: list.c:702
int GetDatabaseEncoding(void)
Definition: mbutils.c:1261
int pg_get_client_encoding(void)
Definition: mbutils.c:336
char * pstrdup(const char *in)
Definition: mcxt.c:1699
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding)
Definition: namespace.c:4080
#define makeNode(_type_)
Definition: nodes.h:157
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
static char * filename
Definition: pg_dumpall.c:124
#define lfirst_int(lc)
Definition: pg_list.h:173
@ PG_SQL_ASCII
Definition: pg_wchar.h:226
#define pg_encoding_to_char
Definition: pg_wchar.h:630
Expr * expression_planner(Expr *expr)
Definition: planner.c:6640
CommandDest whereToSendOutput
Definition: postgres.c:91
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
#define PROGRESS_COPY_COMMAND
Definition: progress.h:146
#define PROGRESS_COPY_TYPE_FILE
Definition: progress.h:155
#define PROGRESS_COPY_COMMAND_FROM
Definition: progress.h:151
#define PROGRESS_COPY_TYPE
Definition: progress.h:147
#define PROGRESS_COPY_TYPE_PROGRAM
Definition: progress.h:156
#define PROGRESS_COPY_BYTES_TOTAL
Definition: progress.h:143
#define PROGRESS_COPY_TYPE_CALLBACK
Definition: progress.h:158
#define PROGRESS_COPY_TYPE_PIPE
Definition: progress.h:157
#define RelationGetRelid(relation)
Definition: rel.h:512
#define RelationGetDescr(relation)
Definition: rel.h:538
#define RelationGetRelationName(relation)
Definition: rel.h:546
Node * build_column_default(Relation rel, int attrno)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
bool force_notnull_all
Definition: copy.h:80
bool convert_selectively
Definition: copy.h:85
CopyOnErrorChoice on_error
Definition: copy.h:86
List * force_null
Definition: copy.h:82
List * convert_select
Definition: copy.h:89
bool force_null_all
Definition: copy.h:83
bool * force_notnull_flags
Definition: copy.h:81
int file_encoding
Definition: copy.h:62
bool * force_null_flags
Definition: copy.h:84
char * default_print
Definition: copy.h:71
List * force_notnull
Definition: copy.h:79
void(* CopyFromInFunc)(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
Definition: copyapi.h:74
void(* CopyFromStart)(CopyFromState cstate, TupleDesc tupDesc)
Definition: copyapi.h:85
copy_data_source_cb data_source_cb
const struct CopyFromRoutine * routine
CopyFormatOptions opts
StringInfoData attribute_buf
MemoryContext copycontext
const char * cur_attval
const char * cur_attname
const char * cur_relname
ErrorSaveContext * escontext
bool details_wanted
Definition: miscnodes.h:48
bool error_occurred
Definition: miscnodes.h:47
NodeTag type
Definition: miscnodes.h:46
Definition: fmgr.h:57
Definition: pg_list.h:54
Definition: nodes.h:131
List * p_rteperminfos
Definition: parse_node.h:213
List * p_rtable
Definition: parse_node.h:212
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:154
#define S_ISDIR(m)
Definition: win32_port.h:315
#define fstat
Definition: win32_port.h:273

References AllocateFile(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), attnum, CopyFromStateData::attnumlist, CopyFromStateData::attribute_buf, build_column_default(), CopyFromStateData::bytes_processed, contain_volatile_functions_not_nextval(), CopyFromStateData::conversion_proc, CopyFormatOptions::convert_select, CopyFromStateData::convert_select_flags, CopyFormatOptions::convert_selectively, COPY_CALLBACK, COPY_FILE, CopyFromStateData::copy_file, COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromStateData::copy_src, CopyFromStateData::copycontext, CopyFromGetRoutine(), CopyFromRoutine::CopyFromInFunc, CopyFromRoutine::CopyFromStart, CopyGetAttnums(), cur, CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, CurrentMemoryContext, CopyFromStateData::data_source_cb, CopyFormatOptions::default_print, CopyFromStateData::defaults, CopyFromStateData::defexprs, CopyFromStateData::defmap, DestRemote, ErrorSaveContext::details_wanted, CopyFromStateData::eol_type, EOL_UNKNOWN, ereport, errcode(), errcode_for_file_access(), errhint(), errmsg(), errmsg_internal(), ERROR, ErrorSaveContext::error_occurred, CopyFromStateData::escontext, ExecInitExpr(), expression_planner(), CopyFormatOptions::file_encoding, CopyFromStateData::file_encoding, filename, CopyFromStateData::filename, FindDefaultConversionProc(), CopyFormatOptions::force_notnull, CopyFormatOptions::force_notnull_all, CopyFormatOptions::force_notnull_flags, CopyFormatOptions::force_null, CopyFormatOptions::force_null_all, CopyFormatOptions::force_null_flags, fstat, GetDatabaseEncoding(), CopyFromStateData::in_functions, initStringInfo(), InvalidOid, CopyFromStateData::is_program, lfirst_int, list_member_int(), makeNode, MemoryContextSwitchTo(), MemSet, NameStr, TupleDescData::natts, CopyFromStateData::need_transcoding, CopyFromStateData::num_defaults, OidIsValid, CopyFormatOptions::on_error, OpenPipeStream(), CopyFromStateData::opts, ParseState::p_rtable, ParseState::p_rteperminfos, palloc(), palloc0(), PG_BINARY_R, pg_encoding_to_char, pg_get_client_encoding(), PG_SQL_ASCII, pgstat_progress_start_command(), pgstat_progress_update_multi_param(), ProcessCopyOptions(), PROGRESS_COMMAND_COPY, PROGRESS_COPY_BYTES_TOTAL, PROGRESS_COPY_COMMAND, PROGRESS_COPY_COMMAND_FROM, PROGRESS_COPY_TYPE, PROGRESS_COPY_TYPE_CALLBACK, PROGRESS_COPY_TYPE_FILE, PROGRESS_COPY_TYPE_PIPE, PROGRESS_COPY_TYPE_PROGRAM, pstrdup(), CopyFromStateData::range_table, CopyFromStateData::raw_buf, CopyFromStateData::raw_buf_index, CopyFromStateData::raw_buf_len, RAW_BUF_SIZE, CopyFromStateData::raw_reached_eof, ReceiveCopyBegin(), CopyFromStateData::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, CopyFromStateData::relname_only, CopyFromStateData::routine, CopyFromStateData::rteperminfos, S_ISDIR, stat::st_mode, stat::st_size, TupleDescAttr(), ErrorSaveContext::type, CopyFromStateData::typioparams, CopyFromStateData::volatile_defexprs, CopyFromStateData::whereClause, and whereToSendOutput.

Referenced by copy_table(), DoCopy(), file_acquire_sample_rows(), fileBeginForeignScan(), and fileReScanForeignScan().

◆ ClosePipeFromProgram()

static void ClosePipeFromProgram ( CopyFromState  cstate)
static

Definition at line 1943 of file copyfrom.c.

1944{
1945 int pclose_rc;
1946
1947 Assert(cstate->is_program);
1948
1949 pclose_rc = ClosePipeStream(cstate->copy_file);
1950 if (pclose_rc == -1)
1951 ereport(ERROR,
1953 errmsg("could not close pipe to external command: %m")));
1954 else if (pclose_rc != 0)
1955 {
1956 /*
1957 * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1958 * expectable for the called program to fail with SIGPIPE, and we
1959 * should not report that as an error. Otherwise, SIGPIPE indicates a
1960 * problem.
1961 */
1962 if (!cstate->raw_reached_eof &&
1963 wait_result_is_signal(pclose_rc, SIGPIPE))
1964 return;
1965
1966 ereport(ERROR,
1967 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1968 errmsg("program \"%s\" failed",
1969 cstate->filename),
1970 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1971 }
1972}
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int ClosePipeStream(FILE *file)
Definition: fd.c:3013
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
bool wait_result_is_signal(int exit_status, int signum)
Definition: wait_error.c:102
#define SIGPIPE
Definition: win32_port.h:163

References Assert(), ClosePipeStream(), CopyFromStateData::copy_file, ereport, errcode(), errcode_for_file_access(), errdetail_internal(), errmsg(), ERROR, CopyFromStateData::filename, CopyFromStateData::is_program, CopyFromStateData::raw_reached_eof, SIGPIPE, wait_result_is_signal(), and wait_result_to_str().

Referenced by EndCopyFrom().

◆ CopyFrom()

uint64 CopyFrom ( CopyFromState  cstate)

Definition at line 779 of file copyfrom.c.

780{
781 ResultRelInfo *resultRelInfo;
782 ResultRelInfo *target_resultRelInfo;
783 ResultRelInfo *prevResultRelInfo = NULL;
784 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
785 ModifyTableState *mtstate;
786 ExprContext *econtext;
787 TupleTableSlot *singleslot = NULL;
789
790 PartitionTupleRouting *proute = NULL;
791 ErrorContextCallback errcallback;
792 CommandId mycid = GetCurrentCommandId(true);
793 int ti_options = 0; /* start with default options for insert */
794 BulkInsertState bistate = NULL;
795 CopyInsertMethod insertMethod;
796 CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
797 int64 processed = 0;
798 int64 excluded = 0;
799 bool has_before_insert_row_trig;
800 bool has_instead_insert_row_trig;
801 bool leafpart_use_multi_insert = false;
802
803 Assert(cstate->rel);
804 Assert(list_length(cstate->range_table) == 1);
805
806 if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
807 Assert(cstate->escontext);
808
809 /*
810 * The target must be a plain, foreign, or partitioned relation, or have
811 * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
812 * allowed on views, so we only hint about them in the view case.)
813 */
814 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
815 cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
816 cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
817 !(cstate->rel->trigdesc &&
819 {
820 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
822 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
823 errmsg("cannot copy to view \"%s\"",
825 errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
826 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
828 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
829 errmsg("cannot copy to materialized view \"%s\"",
830 RelationGetRelationName(cstate->rel))));
831 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
833 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
834 errmsg("cannot copy to sequence \"%s\"",
835 RelationGetRelationName(cstate->rel))));
836 else
838 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
839 errmsg("cannot copy to non-table relation \"%s\"",
840 RelationGetRelationName(cstate->rel))));
841 }
842
843 /*
844 * If the target file is new-in-transaction, we assume that checking FSM
845 * for free space is a waste of time. This could possibly be wrong, but
846 * it's unlikely.
847 */
848 if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
851 ti_options |= TABLE_INSERT_SKIP_FSM;
852
853 /*
854 * Optimize if new relation storage was created in this subxact or one of
855 * its committed children and we won't see those rows later as part of an
856 * earlier scan or command. The subxact test ensures that if this subxact
857 * aborts then the frozen rows won't be visible after xact cleanup. Note
858 * that the stronger test of exactly which subtransaction created it is
859 * crucial for correctness of this optimization. The test for an earlier
860 * scan or command tolerates false negatives. FREEZE causes other sessions
861 * to see rows they would not see under MVCC, and a false negative merely
862 * spreads that anomaly to the current session.
863 */
864 if (cstate->opts.freeze)
865 {
866 /*
867 * We currently disallow COPY FREEZE on partitioned tables. The
868 * reason for this is that we've simply not yet opened the partitions
869 * to determine if the optimization can be applied to them. We could
870 * go and open them all here, but doing so may be quite a costly
871 * overhead for small copies. In any case, we may just end up routing
872 * tuples to a small number of partitions. It seems better just to
873 * raise an ERROR for partitioned tables.
874 */
875 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
876 {
878 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 errmsg("cannot perform COPY FREEZE on a partitioned table")));
880 }
881
882 /* There's currently no support for COPY FREEZE on foreign tables. */
883 if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
885 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886 errmsg("cannot perform COPY FREEZE on a foreign table")));
887
888 /*
889 * Tolerate one registration for the benefit of FirstXactSnapshot.
890 * Scan-bearing queries generally create at least two registrations,
891 * though relying on that is fragile, as is ignoring ActiveSnapshot.
892 * Clear CatalogSnapshot to avoid counting its registration. We'll
893 * still detect ongoing catalog scans, each of which separately
894 * registers the snapshot it uses.
895 */
899 (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
900 errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
901
905 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906 errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
907
908 ti_options |= TABLE_INSERT_FROZEN;
909 }
910
911 /*
912 * We need a ResultRelInfo so we can use the regular executor's
913 * index-entry-making machinery. (There used to be a huge amount of code
914 * here that basically duplicated execUtils.c ...)
915 */
916 ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos,
918 resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
919 ExecInitResultRelation(estate, resultRelInfo, 1);
920
921 /* Verify the named relation is a valid target for INSERT */
922 CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
923
924 ExecOpenIndices(resultRelInfo, false);
925
926 /*
927 * Set up a ModifyTableState so we can let FDW(s) init themselves for
928 * foreign-table result relation(s).
929 */
930 mtstate = makeNode(ModifyTableState);
931 mtstate->ps.plan = NULL;
932 mtstate->ps.state = estate;
933 mtstate->operation = CMD_INSERT;
934 mtstate->mt_nrels = 1;
935 mtstate->resultRelInfo = resultRelInfo;
936 mtstate->rootResultRelInfo = resultRelInfo;
937
938 if (resultRelInfo->ri_FdwRoutine != NULL &&
939 resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
940 resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
941 resultRelInfo);
942
943 /*
944 * Also, if the named relation is a foreign table, determine if the FDW
945 * supports batch insert and determine the batch size (a FDW may support
946 * batching, but it may be disabled for the server/table).
947 *
948 * If the FDW does not support batching, we set the batch size to 1.
949 */
950 if (resultRelInfo->ri_FdwRoutine != NULL &&
953 resultRelInfo->ri_BatchSize =
954 resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
955 else
956 resultRelInfo->ri_BatchSize = 1;
957
958 Assert(resultRelInfo->ri_BatchSize >= 1);
959
960 /* Prepare to catch AFTER triggers. */
962
963 /*
964 * If there are any triggers with transition tables on the named relation,
965 * we need to be prepared to capture transition tuples.
966 *
967 * Because partition tuple routing would like to know about whether
968 * transition capture is active, we also set it in mtstate, which is
969 * passed to ExecFindPartition() below.
970 */
971 cstate->transition_capture = mtstate->mt_transition_capture =
973 RelationGetRelid(cstate->rel),
974 CMD_INSERT);
975
976 /*
977 * If the named relation is a partitioned table, initialize state for
978 * CopyFrom tuple routing.
979 */
980 if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
981 proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
982
983 if (cstate->whereClause)
984 cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
985 &mtstate->ps);
986
987 /*
988 * It's generally more efficient to prepare a bunch of tuples for
989 * insertion, and insert them in one
990 * table_multi_insert()/ExecForeignBatchInsert() call, than call
991 * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
992 * However, there are a number of reasons why we might not be able to do
993 * this. These are explained below.
994 */
995 if (resultRelInfo->ri_TrigDesc != NULL &&
996 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
997 resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
998 {
999 /*
1000 * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
1001 * triggers on the table. Such triggers might query the table we're
1002 * inserting into and act differently if the tuples that have already
1003 * been processed and prepared for insertion are not there.
1004 */
1005 insertMethod = CIM_SINGLE;
1006 }
1007 else if (resultRelInfo->ri_FdwRoutine != NULL &&
1008 resultRelInfo->ri_BatchSize == 1)
1009 {
1010 /*
1011 * Can't support multi-inserts to a foreign table if the FDW does not
1012 * support batching, or it's disabled for the server or foreign table.
1013 */
1014 insertMethod = CIM_SINGLE;
1015 }
1016 else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
1017 resultRelInfo->ri_TrigDesc->trig_insert_new_table)
1018 {
1019 /*
1020 * For partitioned tables we can't support multi-inserts when there
1021 * are any statement level insert triggers. It might be possible to
1022 * allow partitioned tables with such triggers in the future, but for
1023 * now, CopyMultiInsertInfoFlush expects that any after row insert and
1024 * statement level insert triggers are on the same relation.
1025 */
1026 insertMethod = CIM_SINGLE;
1027 }
1028 else if (cstate->volatile_defexprs)
1029 {
1030 /*
1031 * Can't support multi-inserts if there are any volatile default
1032 * expressions in the table. Similarly to the trigger case above,
1033 * such expressions may query the table we're inserting into.
1034 *
1035 * Note: It does not matter if any partitions have any volatile
1036 * default expressions as we use the defaults from the target of the
1037 * COPY command.
1038 */
1039 insertMethod = CIM_SINGLE;
1040 }
1041 else if (contain_volatile_functions(cstate->whereClause))
1042 {
1043 /*
1044 * Can't support multi-inserts if there are any volatile function
1045 * expressions in WHERE clause. Similarly to the trigger case above,
1046 * such expressions may query the table we're inserting into.
1047 *
1048 * Note: the whereClause was already preprocessed in DoCopy(), so it's
1049 * okay to use contain_volatile_functions() directly.
1050 */
1051 insertMethod = CIM_SINGLE;
1052 }
1053 else
1054 {
1055 /*
1056 * For partitioned tables, we may still be able to perform bulk
1057 * inserts. However, the possibility of this depends on which types
1058 * of triggers exist on the partition. We must disable bulk inserts
1059 * if the partition is a foreign table that can't use batching or it
1060 * has any before row insert or insert instead triggers (same as we
1061 * checked above for the parent table). Since the partition's
1062 * resultRelInfos are initialized only when we actually need to insert
1063 * the first tuple into them, we must have the intermediate insert
1064 * method of CIM_MULTI_CONDITIONAL to flag that we must later
1065 * determine if we can use bulk-inserts for the partition being
1066 * inserted into.
1067 */
1068 if (proute)
1069 insertMethod = CIM_MULTI_CONDITIONAL;
1070 else
1071 insertMethod = CIM_MULTI;
1072
1073 CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
1074 estate, mycid, ti_options);
1075 }
1076
1077 /*
1078 * If not using batch mode (which allocates slots as needed) set up a
1079 * tuple slot too. When inserting into a partitioned table, we also need
1080 * one, even if we might batch insert, to read the tuple in the root
1081 * partition's form.
1082 */
1083 if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
1084 {
1085 singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
1086 &estate->es_tupleTable);
1087 bistate = GetBulkInsertState();
1088 }
1089
1090 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1091 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1092
1093 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1094 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1095
1096 /*
1097 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
1098 * should do this for COPY, since it's not really an "INSERT" statement as
1099 * such. However, executing these triggers maintains consistency with the
1100 * EACH ROW triggers that we already fire on COPY.
1101 */
1102 ExecBSInsertTriggers(estate, resultRelInfo);
1103
1104 econtext = GetPerTupleExprContext(estate);
1105
1106 /* Set up callback to identify error line number */
1107 errcallback.callback = CopyFromErrorCallback;
1108 errcallback.arg = cstate;
1109 errcallback.previous = error_context_stack;
1110 error_context_stack = &errcallback;
1111
1112 for (;;)
1113 {
1114 TupleTableSlot *myslot;
1115 bool skip_tuple;
1116
1118
1119 /*
1120 * Reset the per-tuple exprcontext. We do this after every tuple, to
1121 * clean-up after expression evaluations etc.
1122 */
1124
1125 /* select slot to (initially) load row into */
1126 if (insertMethod == CIM_SINGLE || proute)
1127 {
1128 myslot = singleslot;
1129 Assert(myslot != NULL);
1130 }
1131 else
1132 {
1133 Assert(resultRelInfo == target_resultRelInfo);
1134 Assert(insertMethod == CIM_MULTI);
1135
1136 myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1137 resultRelInfo);
1138 }
1139
1140 /*
1141 * Switch to per-tuple context before calling NextCopyFrom, which does
1142 * evaluate default expressions etc. and requires per-tuple context.
1143 */
1145
1146 ExecClearTuple(myslot);
1147
1148 /* Directly store the values/nulls array in the slot */
1149 if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
1150 break;
1151
1152 if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1153 cstate->escontext->error_occurred)
1154 {
1155 /*
1156 * Soft error occurred, skip this tuple and just make
1157 * ErrorSaveContext ready for the next NextCopyFrom. Since we
1158 * don't set details_wanted and error_data is not to be filled,
1159 * just resetting error_occurred is enough.
1160 */
1161 cstate->escontext->error_occurred = false;
1162
1163 /* Report that this tuple was skipped by the ON_ERROR clause */
1165 cstate->num_errors);
1166
1167 if (cstate->opts.reject_limit > 0 &&
1168 cstate->num_errors > cstate->opts.reject_limit)
1169 ereport(ERROR,
1170 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1171 errmsg("skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
1172 (long long) cstate->opts.reject_limit)));
1173
1174 /* Repeat NextCopyFrom() until no soft error occurs */
1175 continue;
1176 }
1177
1178 ExecStoreVirtualTuple(myslot);
1179
1180 /*
1181 * Constraints and where clause might reference the tableoid column,
1182 * so (re-)initialize tts_tableOid before evaluating them.
1183 */
1184 myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1185
1186 /* Triggers and stuff need to be invoked in query context. */
1187 MemoryContextSwitchTo(oldcontext);
1188
1189 if (cstate->whereClause)
1190 {
1191 econtext->ecxt_scantuple = myslot;
1192 /* Skip items that don't match COPY's WHERE clause */
1193 if (!ExecQual(cstate->qualexpr, econtext))
1194 {
1195 /*
1196 * Report that this tuple was filtered out by the WHERE
1197 * clause.
1198 */
1200 ++excluded);
1201 continue;
1202 }
1203 }
1204
1205 /* Determine the partition to insert the tuple into */
1206 if (proute)
1207 {
1208 TupleConversionMap *map;
1209
1210 /*
1211 * Attempt to find a partition suitable for this tuple.
1212 * ExecFindPartition() will raise an error if none can be found or
1213 * if the found partition is not suitable for INSERTs.
1214 */
1215 resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1216 proute, myslot, estate);
1217
1218 if (prevResultRelInfo != resultRelInfo)
1219 {
1220 /* Determine which triggers exist on this partition */
1221 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1222 resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1223
1224 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1225 resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1226
1227 /*
1228 * Disable multi-inserts when the partition has BEFORE/INSTEAD
1229 * OF triggers, or if the partition is a foreign table that
1230 * can't use batching.
1231 */
1232 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1233 !has_before_insert_row_trig &&
1234 !has_instead_insert_row_trig &&
1235 (resultRelInfo->ri_FdwRoutine == NULL ||
1236 resultRelInfo->ri_BatchSize > 1);
1237
1238 /* Set the multi-insert buffer to use for this partition. */
1239 if (leafpart_use_multi_insert)
1240 {
1241 if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1242 CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1243 resultRelInfo);
1244 }
1245 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1246 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1247 {
1248 /*
1249 * Flush pending inserts if this partition can't use
1250 * batching, so rows are visible to triggers etc.
1251 */
1252 CopyMultiInsertInfoFlush(&multiInsertInfo,
1253 resultRelInfo,
1254 &processed);
1255 }
1256
1257 if (bistate != NULL)
1259 prevResultRelInfo = resultRelInfo;
1260 }
1261
1262 /*
1263 * If we're capturing transition tuples, we might need to convert
1264 * from the partition rowtype to root rowtype. But if there are no
1265 * BEFORE triggers on the partition that could change the tuple,
1266 * we can just remember the original unconverted tuple to avoid a
1267 * needless round trip conversion.
1268 */
1269 if (cstate->transition_capture != NULL)
1271 !has_before_insert_row_trig ? myslot : NULL;
1272
1273 /*
1274 * We might need to convert from the root rowtype to the partition
1275 * rowtype.
1276 */
1277 map = ExecGetRootToChildMap(resultRelInfo, estate);
1278 if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1279 {
1280 /* non batch insert */
1281 if (map != NULL)
1282 {
1283 TupleTableSlot *new_slot;
1284
1285 new_slot = resultRelInfo->ri_PartitionTupleSlot;
1286 myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1287 }
1288 }
1289 else
1290 {
1291 /*
1292 * Prepare to queue up tuple for later batch insert into
1293 * current partition.
1294 */
1295 TupleTableSlot *batchslot;
1296
1297 /* no other path available for partitioned table */
1298 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1299
1300 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1301 resultRelInfo);
1302
1303 if (map != NULL)
1304 myslot = execute_attr_map_slot(map->attrMap, myslot,
1305 batchslot);
1306 else
1307 {
1308 /*
1309 * This looks more expensive than it is (Believe me, I
1310 * optimized it away. Twice.). The input is in virtual
1311 * form, and we'll materialize the slot below - for most
1312 * slot types the copy performs the work materialization
1313 * would later require anyway.
1314 */
1315 ExecCopySlot(batchslot, myslot);
1316 myslot = batchslot;
1317 }
1318 }
1319
1320 /* ensure that triggers etc see the right relation */
1321 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1322 }
1323
1324 skip_tuple = false;
1325
1326 /* BEFORE ROW INSERT Triggers */
1327 if (has_before_insert_row_trig)
1328 {
1329 if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1330 skip_tuple = true; /* "do nothing" */
1331 }
1332
1333 if (!skip_tuple)
1334 {
1335 /*
1336 * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1337 * tuple. Otherwise, proceed with inserting the tuple into the
1338 * table or foreign table.
1339 */
1340 if (has_instead_insert_row_trig)
1341 {
1342 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1343 }
1344 else
1345 {
1346 /* Compute stored generated columns */
1347 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1349 ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1350 CMD_INSERT);
1351
1352 /*
1353 * If the target is a plain table, check the constraints of
1354 * the tuple.
1355 */
1356 if (resultRelInfo->ri_FdwRoutine == NULL &&
1357 resultRelInfo->ri_RelationDesc->rd_att->constr)
1358 ExecConstraints(resultRelInfo, myslot, estate);
1359
1360 /*
1361 * Also check the tuple against the partition constraint, if
1362 * there is one; except that if we got here via tuple-routing,
1363 * we don't need to if there's no BR trigger defined on the
1364 * partition.
1365 */
1366 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1367 (proute == NULL || has_before_insert_row_trig))
1368 ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1369
1370 /* Store the slot in the multi-insert buffer, when enabled. */
1371 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1372 {
1373 /*
1374 * The slot previously might point into the per-tuple
1375 * context. For batching it needs to be longer lived.
1376 */
1377 ExecMaterializeSlot(myslot);
1378
1379 /* Add this tuple to the tuple buffer */
1380 CopyMultiInsertInfoStore(&multiInsertInfo,
1381 resultRelInfo, myslot,
1382 cstate->line_buf.len,
1383 cstate->cur_lineno);
1384
1385 /*
1386 * If enough inserts have queued up, then flush all
1387 * buffers out to their tables.
1388 */
1389 if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
1390 CopyMultiInsertInfoFlush(&multiInsertInfo,
1391 resultRelInfo,
1392 &processed);
1393
1394 /*
1395 * We delay updating the row counter and progress of the
1396 * COPY command until after writing the tuples stored in
1397 * the buffer out to the table, as in single insert mode.
1398 * See CopyMultiInsertBufferFlush().
1399 */
1400 continue; /* next tuple please */
1401 }
1402 else
1403 {
1404 List *recheckIndexes = NIL;
1405
1406 /* OK, store the tuple */
1407 if (resultRelInfo->ri_FdwRoutine != NULL)
1408 {
1409 myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1410 resultRelInfo,
1411 myslot,
1412 NULL);
1413
1414 if (myslot == NULL) /* "do nothing" */
1415 continue; /* next tuple please */
1416
1417 /*
1418 * AFTER ROW Triggers might reference the tableoid
1419 * column, so (re-)initialize tts_tableOid before
1420 * evaluating them.
1421 */
1422 myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1423 }
1424 else
1425 {
1426 /* OK, store the tuple and create index entries for it */
1427 table_tuple_insert(resultRelInfo->ri_RelationDesc,
1428 myslot, mycid, ti_options, bistate);
1429
1430 if (resultRelInfo->ri_NumIndices > 0)
1431 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1432 myslot,
1433 estate,
1434 false,
1435 false,
1436 NULL,
1437 NIL,
1438 false);
1439 }
1440
1441 /* AFTER ROW INSERT Triggers */
1442 ExecARInsertTriggers(estate, resultRelInfo, myslot,
1443 recheckIndexes, cstate->transition_capture);
1444
1445 list_free(recheckIndexes);
1446 }
1447 }
1448
1449 /*
1450 * We count only tuples not suppressed by a BEFORE INSERT trigger
1451 * or FDW; this is the same definition used by nodeModifyTable.c
1452 * for counting tuples inserted by an INSERT command. Update
1453 * progress of the COPY command as well.
1454 */
1456 ++processed);
1457 }
1458 }
1459
1460 /* Flush any remaining buffered tuples */
1461 if (insertMethod != CIM_SINGLE)
1462 {
1463 if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1464 CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1465 }
1466
1467 /* Done, clean up */
1468 error_context_stack = errcallback.previous;
1469
1470 if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
1471 cstate->num_errors > 0 &&
1474 errmsg_plural("%llu row was skipped due to data type incompatibility",
1475 "%llu rows were skipped due to data type incompatibility",
1476 (unsigned long long) cstate->num_errors,
1477 (unsigned long long) cstate->num_errors));
1478
1479 if (bistate != NULL)
1480 FreeBulkInsertState(bistate);
1481
1482 MemoryContextSwitchTo(oldcontext);
1483
1484 /* Execute AFTER STATEMENT insertion triggers */
1485 ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1486
1487 /* Handle queued AFTER triggers */
1488 AfterTriggerEndQuery(estate);
1489
1490 ExecResetTupleTable(estate->es_tupleTable, false);
1491
1492 /* Allow the FDW to shut down */
1493 if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1494 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1495 target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1496 target_resultRelInfo);
1497
1498 /* Tear down the multi-insert buffer data */
1499 if (insertMethod != CIM_SINGLE)
1500 CopyMultiInsertInfoCleanup(&multiInsertInfo);
1501
1502 /* Close all the partitioned tables, leaf partitions, and their indices */
1503 if (proute)
1504 ExecCleanupTupleRouting(mtstate, proute);
1505
1506 /* Close the result relations, including any trigger target relations */
1509
1510 FreeExecutorState(estate);
1511
1512 return processed;
1513}
void pgstat_progress_update_param(int index, int64 val)
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
#define InvalidSubTransactionId
Definition: c.h:629
uint32 CommandId
Definition: c.h:637
bool contain_volatile_functions(Node *clause)
Definition: clauses.c:537
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:380
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
Definition: copyfrom.c:735
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
Definition: copyfrom.c:400
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
Definition: copyfrom.c:662
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
Definition: copyfrom.c:756
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:716
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:425
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
Definition: copyfrom.c:437
void CopyFromErrorCallback(void *arg)
Definition: copyfrom.c:254
CopyInsertMethod
@ CIM_SINGLE
@ CIM_MULTI_CONDITIONAL
@ CIM_MULTI
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1180
ErrorContextCallback * error_context_stack
Definition: elog.c:94
#define NOTICE
Definition: elog.h:35
ExprState * ExecInitQual(List *qual, PlanState *parent)
Definition: execExpr.c:229
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
Definition: execIndexing.c:160
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:310
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, List *mergeActions)
Definition: execMain.c:1149
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1930
void ExecCloseResultRelations(EState *estate)
Definition: execMain.c:1647
void ExecCloseRangeTableRelations(EState *estate)
Definition: execMain.c:1707
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:2054
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
Definition: execTuples.c:1378
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1739
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
Definition: execUtils.c:1324
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:775
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
Definition: execUtils.c:878
void FreeExecutorState(EState *estate)
Definition: execUtils.c:193
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define ResetPerTupleExprContext(estate)
Definition: executor.h:646
#define GetPerTupleExprContext(estate)
Definition: executor.h:637
#define GetPerTupleMemoryContext(estate)
Definition: executor.h:642
static bool ExecQual(ExprState *state, ExprContext *econtext)
Definition: executor.h:500
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
Definition: heapam.c:2051
BulkInsertState GetBulkInsertState(void)
Definition: heapam.c:2022
void FreeBulkInsertState(BulkInsertState bistate)
Definition: heapam.c:2039
@ COPY_LOG_VERBOSITY_DEFAULT
Definition: copy.h:49
void list_free(List *list)
Definition: list.c:1546
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:269
#define castNode(_type_, nodeptr)
Definition: nodes.h:178
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
bool ThereAreNoReadyPortals(void)
Definition: portalmem.c:1173
#define PROGRESS_COPY_TUPLES_PROCESSED
Definition: progress.h:144
#define PROGRESS_COPY_TUPLES_EXCLUDED
Definition: progress.h:145
#define PROGRESS_COPY_TUPLES_SKIPPED
Definition: progress.h:148
bool ThereAreNoPriorRegisteredSnapshots(void)
Definition: snapmgr.c:1613
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:443
bool freeze
Definition: copy.h:65
CopyLogVerbosityChoice log_verbosity
Definition: copy.h:87
int64 reject_limit
Definition: copy.h:88
StringInfoData line_buf
TransitionCaptureState * transition_capture
List * es_tupleTable
Definition: execnodes.h:704
struct ErrorContextCallback * previous
Definition: elog.h:296
void(* callback)(void *arg)
Definition: elog.h:297
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:268
EndForeignInsert_function EndForeignInsert
Definition: fdwapi.h:239
BeginForeignInsert_function BeginForeignInsert
Definition: fdwapi.h:238
ExecForeignInsert_function ExecForeignInsert
Definition: fdwapi.h:232
ExecForeignBatchInsert_function ExecForeignBatchInsert
Definition: fdwapi.h:233
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
Definition: fdwapi.h:234
CmdType operation
Definition: execnodes.h:1392
ResultRelInfo * resultRelInfo
Definition: execnodes.h:1396
PlanState ps
Definition: execnodes.h:1391
ResultRelInfo * rootResultRelInfo
Definition: execnodes.h:1404
struct TransitionCaptureState * mt_transition_capture
Definition: execnodes.h:1430
Plan * plan
Definition: execnodes.h:1153
EState * state
Definition: execnodes.h:1155
SubTransactionId rd_firstRelfilelocatorSubid
Definition: rel.h:106
TriggerDesc * trigdesc
Definition: rel.h:117
TupleDesc rd_att
Definition: rel.h:112
SubTransactionId rd_newRelfilelocatorSubid
Definition: rel.h:104
SubTransactionId rd_createSubid
Definition: rel.h:103
Form_pg_class rd_rel
Definition: rel.h:111
TupleTableSlot * ri_PartitionTupleSlot
Definition: execnodes.h:610
int ri_NumIndices
Definition: execnodes.h:478
Relation ri_RelationDesc
Definition: execnodes.h:475
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
Definition: execnodes.h:613
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:510
struct FdwRoutine * ri_FdwRoutine
Definition: execnodes.h:528
int ri_BatchSize
Definition: execnodes.h:539
TupleTableSlot * tcs_original_insert_tuple
Definition: trigger.h:76
bool trig_insert_instead_row
Definition: reltrigger.h:58
bool trig_insert_new_table
Definition: reltrigger.h:75
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:46
AttrMap * attrMap
Definition: tupconvert.h:28
TupleConstr * constr
Definition: tupdesc.h:135
Oid tts_tableOid
Definition: tuptable.h:130
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
#define TABLE_INSERT_FROZEN
Definition: tableam.h:261
#define TABLE_INSERT_SKIP_FSM
Definition: tableam.h:260
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1372
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
Definition: trigger.c:4917
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
Definition: trigger.c:2399
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2463
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2558
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2541
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
Definition: trigger.c:2450
void AfterTriggerEndQuery(EState *estate)
Definition: trigger.c:5088
void AfterTriggerBeginQuery(void)
Definition: trigger.c:5053
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:472
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:791
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829

References AfterTriggerBeginQuery(), AfterTriggerEndQuery(), ErrorContextCallback::arg, Assert(), TupleConversionMap::attrMap, FdwRoutine::BeginForeignInsert, bms_make_singleton(), ErrorContextCallback::callback, castNode, CHECK_FOR_INTERRUPTS, CheckValidResultRel(), CIM_MULTI, CIM_MULTI_CONDITIONAL, CIM_SINGLE, CMD_INSERT, TupleDescData::constr, contain_volatile_functions(), COPY_LOG_VERBOSITY_DEFAULT, COPY_ON_ERROR_IGNORE, COPY_ON_ERROR_STOP, CopyFromErrorCallback(), CopyMultiInsertInfoCleanup(), CopyMultiInsertInfoFlush(), CopyMultiInsertInfoInit(), CopyMultiInsertInfoIsEmpty(), CopyMultiInsertInfoIsFull(), CopyMultiInsertInfoNextFreeSlot(), CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfoStore(), CreateExecutorState(), CopyFromStateData::cur_lineno, CurrentMemoryContext, ExprContext::ecxt_scantuple, FdwRoutine::EndForeignInsert, ereport, errcode(), errhint(), errmsg(), errmsg_plural(), ERROR, error_context_stack, ErrorSaveContext::error_occurred, EState::es_tupleTable, CopyFromStateData::escontext, ExecARInsertTriggers(), ExecASInsertTriggers(), ExecBRInsertTriggers(), ExecBSInsertTriggers(), ExecCleanupTupleRouting(), ExecClearTuple(), ExecCloseRangeTableRelations(), ExecCloseResultRelations(), ExecComputeStoredGenerated(), ExecConstraints(), ExecCopySlot(), ExecFindPartition(), FdwRoutine::ExecForeignBatchInsert, FdwRoutine::ExecForeignInsert, ExecGetRootToChildMap(), ExecInitQual(), ExecInitRangeTable(), ExecInitResultRelation(), ExecInsertIndexTuples(), ExecIRInsertTriggers(), ExecMaterializeSlot(), ExecOpenIndices(), ExecPartitionCheck(), ExecQual(), ExecResetTupleTable(), ExecSetupPartitionTupleRouting(), ExecStoreVirtualTuple(), execute_attr_map_slot(), FreeBulkInsertState(), FreeExecutorState(), CopyFormatOptions::freeze, GetBulkInsertState(), GetCurrentCommandId(), GetCurrentSubTransactionId(), FdwRoutine::GetForeignModifyBatchSize, GetPerTupleExprContext, GetPerTupleMemoryContext, TupleConstr::has_generated_stored, InvalidateCatalogSnapshot(), InvalidSubTransactionId, StringInfoData::len, CopyFromStateData::line_buf, list_free(), list_length(), CopyFormatOptions::log_verbosity, makeNode, MakeTransitionCaptureState(), MemoryContextSwitchTo(), ModifyTableState::mt_nrels, ModifyTableState::mt_transition_capture, NextCopyFrom(), NIL, NOTICE, CopyFromStateData::num_errors, CopyFormatOptions::on_error, ModifyTableState::operation, CopyFromStateData::opts, pgstat_progress_update_param(), PlanState::plan, ErrorContextCallback::previous, PROGRESS_COPY_TUPLES_EXCLUDED, PROGRESS_COPY_TUPLES_PROCESSED, PROGRESS_COPY_TUPLES_SKIPPED, ModifyTableState::ps, CopyFromStateData::qualexpr, CopyFromStateData::range_table, RelationData::rd_att, RelationData::rd_createSubid, RelationData::rd_firstRelfilelocatorSubid, RelationData::rd_newRelfilelocatorSubid, RelationData::rd_rel, CopyFormatOptions::reject_limit, CopyFromStateData::rel, RelationGetRelationName, RelationGetRelid, ReleaseBulkInsertStatePin(), ResetPerTupleExprContext, ModifyTableState::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionTupleSlot, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, ModifyTableState::rootResultRelInfo, CopyFromStateData::rteperminfos, PlanState::state, TABLE_INSERT_FROZEN, TABLE_INSERT_SKIP_FSM, table_slot_create(), table_tuple_insert(), TransitionCaptureState::tcs_original_insert_tuple, ThereAreNoPriorRegisteredSnapshots(), ThereAreNoReadyPortals(), CopyFromStateData::transition_capture, TriggerDesc::trig_insert_before_row, TriggerDesc::trig_insert_instead_row, TriggerDesc::trig_insert_new_table, RelationData::trigdesc, TupleTableSlot::tts_isnull, TupleTableSlot::tts_tableOid, TupleTableSlot::tts_values, CopyFromStateData::volatile_defexprs, and CopyFromStateData::whereClause.

Referenced by copy_table(), and DoCopy().

◆ CopyFromBinaryEnd()

static void CopyFromBinaryEnd ( CopyFromState  cstate)
static

Definition at line 243 of file copyfrom.c.

244{
245 /* nothing to do */
246}

◆ CopyFromBinaryInFunc()

static void CopyFromBinaryInFunc ( CopyFromState  cstate,
Oid  atttypid,
FmgrInfo finfo,
Oid typioparam 
)
static

Definition at line 232 of file copyfrom.c.

234{
235 Oid func_oid;
236
237 getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
238 fmgr_info(func_oid, finfo);
239}
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Definition: fmgr.c:127
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
Definition: lsyscache.c:3023

References fmgr_info(), and getTypeBinaryInputInfo().

◆ CopyFromBinaryStart()

static void CopyFromBinaryStart ( CopyFromState  cstate,
TupleDesc  tupDesc 
)
static

Definition at line 221 of file copyfrom.c.

222{
223 /* Read and verify binary header */
225}
void ReceiveCopyBinaryHeader(CopyFromState cstate)

References ReceiveCopyBinaryHeader().

◆ CopyFromErrorCallback()

void CopyFromErrorCallback ( void *  arg)

Definition at line 254 of file copyfrom.c.

255{
257
258 if (cstate->relname_only)
259 {
260 errcontext("COPY %s",
261 cstate->cur_relname);
262 return;
263 }
264 if (cstate->opts.binary)
265 {
266 /* can't usefully display the data */
267 if (cstate->cur_attname)
268 errcontext("COPY %s, line %llu, column %s",
269 cstate->cur_relname,
270 (unsigned long long) cstate->cur_lineno,
271 cstate->cur_attname);
272 else
273 errcontext("COPY %s, line %llu",
274 cstate->cur_relname,
275 (unsigned long long) cstate->cur_lineno);
276 }
277 else
278 {
279 if (cstate->cur_attname && cstate->cur_attval)
280 {
281 /* error is relevant to a particular column */
282 char *attval;
283
284 attval = CopyLimitPrintoutLength(cstate->cur_attval);
285 errcontext("COPY %s, line %llu, column %s: \"%s\"",
286 cstate->cur_relname,
287 (unsigned long long) cstate->cur_lineno,
288 cstate->cur_attname,
289 attval);
290 pfree(attval);
291 }
292 else if (cstate->cur_attname)
293 {
294 /* error is relevant to a particular column, value is NULL */
295 errcontext("COPY %s, line %llu, column %s: null input",
296 cstate->cur_relname,
297 (unsigned long long) cstate->cur_lineno,
298 cstate->cur_attname);
299 }
300 else
301 {
302 /*
303 * Error is relevant to a particular line.
304 *
305 * If line_buf still contains the correct line, print it.
306 */
307 if (cstate->line_buf_valid)
308 {
309 char *lineval;
310
311 lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
312 errcontext("COPY %s, line %llu: \"%s\"",
313 cstate->cur_relname,
314 (unsigned long long) cstate->cur_lineno, lineval);
315 pfree(lineval);
316 }
317 else
318 {
319 errcontext("COPY %s, line %llu",
320 cstate->cur_relname,
321 (unsigned long long) cstate->cur_lineno);
322 }
323 }
324 }
325}
char * CopyLimitPrintoutLength(const char *str)
Definition: copyfrom.c:333
#define errcontext
Definition: elog.h:196
struct CopyFromStateData * CopyFromState
Definition: copy.h:93
void pfree(void *pointer)
Definition: mcxt.c:1524
void * arg
bool binary
Definition: copy.h:64

References arg, CopyFormatOptions::binary, CopyLimitPrintoutLength(), CopyFromStateData::cur_attname, CopyFromStateData::cur_attval, CopyFromStateData::cur_lineno, CopyFromStateData::cur_relname, StringInfoData::data, errcontext, CopyFromStateData::line_buf, CopyFromStateData::line_buf_valid, CopyFromStateData::opts, pfree(), and CopyFromStateData::relname_only.

Referenced by CopyFrom(), file_acquire_sample_rows(), and fileIterateForeignScan().

◆ CopyFromGetRoutine()

static const CopyFromRoutine * CopyFromGetRoutine ( const CopyFormatOptions opts)
static

Definition at line 156 of file copyfrom.c.

157{
158 if (opts->csv_mode)
159 return &CopyFromRoutineCSV;
160 else if (opts->binary)
161 return &CopyFromRoutineBinary;
162
163 /* default is text */
164 return &CopyFromRoutineText;
165}
static const CopyFromRoutine CopyFromRoutineBinary
Definition: copyfrom.c:147
static const CopyFromRoutine CopyFromRoutineText
Definition: copyfrom.c:131
static const CopyFromRoutine CopyFromRoutineCSV
Definition: copyfrom.c:139
static AmcheckOptions opts
Definition: pg_amcheck.c:112

References CopyFromRoutineBinary, CopyFromRoutineCSV, CopyFromRoutineText, and opts.

Referenced by BeginCopyFrom().

◆ CopyFromTextLikeEnd()

static void CopyFromTextLikeEnd ( CopyFromState  cstate)
static

Definition at line 214 of file copyfrom.c.

215{
216 /* nothing to do */
217}

◆ CopyFromTextLikeInFunc()

static void CopyFromTextLikeInFunc ( CopyFromState  cstate,
Oid  atttypid,
FmgrInfo finfo,
Oid typioparam 
)
static

Definition at line 203 of file copyfrom.c.

205{
206 Oid func_oid;
207
208 getTypeInputInfo(atttypid, &func_oid, typioparam);
209 fmgr_info(func_oid, finfo);
210}
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
Definition: lsyscache.c:2957

References fmgr_info(), and getTypeInputInfo().

◆ CopyFromTextLikeStart()

static void CopyFromTextLikeStart ( CopyFromState  cstate,
TupleDesc  tupDesc 
)
static

Definition at line 169 of file copyfrom.c.

170{
171 AttrNumber attr_count;
172
173 /*
174 * If encoding conversion is needed, we need another buffer to hold the
175 * converted input data. Otherwise, we can just point input_buf to the
176 * same buffer as raw_buf.
177 */
178 if (cstate->need_transcoding)
179 {
180 cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
181 cstate->input_buf_index = cstate->input_buf_len = 0;
182 }
183 else
184 cstate->input_buf = cstate->raw_buf;
185 cstate->input_reached_eof = false;
186
187 initStringInfo(&cstate->line_buf);
188
189 /*
190 * Create workspace for CopyReadAttributes results; used by CSV and text
191 * format.
192 */
193 attr_count = list_length(cstate->attnumlist);
194 cstate->max_fields = attr_count;
195 cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
196}
#define INPUT_BUF_SIZE

References CopyFromStateData::attnumlist, initStringInfo(), CopyFromStateData::input_buf, CopyFromStateData::input_buf_index, CopyFromStateData::input_buf_len, INPUT_BUF_SIZE, CopyFromStateData::input_reached_eof, CopyFromStateData::line_buf, list_length(), CopyFromStateData::max_fields, CopyFromStateData::need_transcoding, palloc(), CopyFromStateData::raw_buf, and CopyFromStateData::raw_fields.

◆ CopyLimitPrintoutLength()

char * CopyLimitPrintoutLength ( const char *  str)

Definition at line 333 of file copyfrom.c.

334{
335#define MAX_COPY_DATA_DISPLAY 100
336
337 int slen = strlen(str);
338 int len;
339 char *res;
340
341 /* Fast path if definitely okay */
342 if (slen <= MAX_COPY_DATA_DISPLAY)
343 return pstrdup(str);
344
345 /* Apply encoding-dependent truncation */
347
348 /*
349 * Truncate, and add "..." to show we truncated the input.
350 */
351 res = (char *) palloc(len + 4);
352 memcpy(res, str, len);
353 strcpy(res + len, "...");
354
355 return res;
356}
#define MAX_COPY_DATA_DISPLAY
const char * str
int pg_mbcliplen(const char *mbstr, int len, int limit)
Definition: mbutils.c:1083
const void size_t len

References len, MAX_COPY_DATA_DISPLAY, palloc(), pg_mbcliplen(), pstrdup(), and str.

Referenced by CopyFromErrorCallback(), and CopyFromTextLikeOneRow().

◆ CopyMultiInsertBufferCleanup()

static void CopyMultiInsertBufferCleanup ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer 
)
inlinestatic

Definition at line 620 of file copyfrom.c.

622{
623 ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
624 int i;
625
626 /* Ensure buffer was flushed */
627 Assert(buffer->nused == 0);
628
629 /* Remove back-link to ourself */
630 resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
631
632 if (resultRelInfo->ri_FdwRoutine == NULL)
633 {
634 Assert(buffer->bistate != NULL);
636 }
637 else
638 Assert(buffer->bistate == NULL);
639
640 /* Since we only create slots on demand, just drop the non-null ones. */
641 for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
643
644 if (resultRelInfo->ri_FdwRoutine == NULL)
646 miinfo->ti_options);
647
648 pfree(buffer);
649}
#define MAX_BUFFERED_TUPLES
Definition: copyfrom.c:63
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
int i
Definition: isn.c:74
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:80
ResultRelInfo * resultRelInfo
Definition: copyfrom.c:81
BulkInsertState bistate
Definition: copyfrom.c:82
static void table_finish_bulk_insert(Relation rel, int options)
Definition: tableam.h:1565

References Assert(), CopyMultiInsertBuffer::bistate, ExecDropSingleTupleTableSlot(), FreeBulkInsertState(), i, MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, pfree(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, table_finish_bulk_insert(), and CopyMultiInsertInfo::ti_options.

Referenced by CopyMultiInsertInfoCleanup(), and CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferFlush()

static void CopyMultiInsertBufferFlush ( CopyMultiInsertInfo miinfo,
CopyMultiInsertBuffer buffer,
int64 processed 
)
inlinestatic

Definition at line 446 of file copyfrom.c.

449{
450 CopyFromState cstate = miinfo->cstate;
451 EState *estate = miinfo->estate;
452 int nused = buffer->nused;
453 ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
454 TupleTableSlot **slots = buffer->slots;
455 int i;
456
457 if (resultRelInfo->ri_FdwRoutine)
458 {
459 int batch_size = resultRelInfo->ri_BatchSize;
460 int sent = 0;
461
462 Assert(buffer->bistate == NULL);
463
464 /* Ensure that the FDW supports batching and it's enabled */
466 Assert(batch_size > 1);
467
468 /*
469 * We suppress error context information other than the relation name,
470 * if one of the operations below fails.
471 */
472 Assert(!cstate->relname_only);
473 cstate->relname_only = true;
474
475 while (sent < nused)
476 {
477 int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
478 int inserted = size;
479 TupleTableSlot **rslots;
480
481 /* insert into foreign table: let the FDW do it */
482 rslots =
483 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
484 resultRelInfo,
485 &slots[sent],
486 NULL,
487 &inserted);
488
489 sent += size;
490
491 /* No need to do anything if there are no inserted rows */
492 if (inserted <= 0)
493 continue;
494
495 /* Triggers on foreign tables should not have transition tables */
496 Assert(resultRelInfo->ri_TrigDesc == NULL ||
497 resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
498
499 /* Run AFTER ROW INSERT triggers */
500 if (resultRelInfo->ri_TrigDesc != NULL &&
501 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
502 {
503 Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
504
505 for (i = 0; i < inserted; i++)
506 {
507 TupleTableSlot *slot = rslots[i];
508
509 /*
510 * AFTER ROW Triggers might reference the tableoid column,
511 * so (re-)initialize tts_tableOid before evaluating them.
512 */
513 slot->tts_tableOid = relid;
514
515 ExecARInsertTriggers(estate, resultRelInfo,
516 slot, NIL,
517 cstate->transition_capture);
518 }
519 }
520
521 /* Update the row counter and progress of the COPY command */
522 *processed += inserted;
524 *processed);
525 }
526
527 for (i = 0; i < nused; i++)
528 ExecClearTuple(slots[i]);
529
530 /* reset relname_only */
531 cstate->relname_only = false;
532 }
533 else
534 {
535 CommandId mycid = miinfo->mycid;
536 int ti_options = miinfo->ti_options;
537 bool line_buf_valid = cstate->line_buf_valid;
538 uint64 save_cur_lineno = cstate->cur_lineno;
539 MemoryContext oldcontext;
540
541 Assert(buffer->bistate != NULL);
542
543 /*
544 * Print error context information correctly, if one of the operations
545 * below fails.
546 */
547 cstate->line_buf_valid = false;
548
549 /*
550 * table_multi_insert may leak memory, so switch to short-lived memory
551 * context before calling it.
552 */
554 table_multi_insert(resultRelInfo->ri_RelationDesc,
555 slots,
556 nused,
557 mycid,
558 ti_options,
559 buffer->bistate);
560 MemoryContextSwitchTo(oldcontext);
561
562 for (i = 0; i < nused; i++)
563 {
564 /*
565 * If there are any indexes, update them for all the inserted
566 * tuples, and run AFTER ROW INSERT triggers.
567 */
568 if (resultRelInfo->ri_NumIndices > 0)
569 {
570 List *recheckIndexes;
571
572 cstate->cur_lineno = buffer->linenos[i];
573 recheckIndexes =
574 ExecInsertIndexTuples(resultRelInfo,
575 buffer->slots[i], estate, false,
576 false, NULL, NIL, false);
577 ExecARInsertTriggers(estate, resultRelInfo,
578 slots[i], recheckIndexes,
579 cstate->transition_capture);
580 list_free(recheckIndexes);
581 }
582
583 /*
584 * There's no indexes, but see if we need to run AFTER ROW INSERT
585 * triggers anyway.
586 */
587 else if (resultRelInfo->ri_TrigDesc != NULL &&
588 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
589 resultRelInfo->ri_TrigDesc->trig_insert_new_table))
590 {
591 cstate->cur_lineno = buffer->linenos[i];
592 ExecARInsertTriggers(estate, resultRelInfo,
593 slots[i], NIL,
594 cstate->transition_capture);
595 }
596
597 ExecClearTuple(slots[i]);
598 }
599
600 /* Update the row counter and progress of the COPY command */
601 *processed += nused;
603 *processed);
604
605 /* reset cur_lineno and line_buf_valid to what they were */
606 cstate->line_buf_valid = line_buf_valid;
607 cstate->cur_lineno = save_cur_lineno;
608 }
609
610 /* Mark that all slots are free */
611 buffer->nused = 0;
612}
uint64_t uint64
Definition: c.h:503
uint64 linenos[MAX_BUFFERED_TUPLES]
Definition: copyfrom.c:85
CommandId mycid
Definition: copyfrom.c:101
CopyFromState cstate
Definition: copyfrom.c:99
bool trig_insert_after_row
Definition: reltrigger.h:57
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
Definition: tableam.h:1427

References Assert(), CopyMultiInsertBuffer::bistate, CopyMultiInsertInfo::cstate, CopyFromStateData::cur_lineno, CopyMultiInsertInfo::estate, ExecARInsertTriggers(), ExecClearTuple(), FdwRoutine::ExecForeignBatchInsert, ExecInsertIndexTuples(), GetPerTupleMemoryContext, i, CopyFromStateData::line_buf_valid, CopyMultiInsertBuffer::linenos, list_free(), MemoryContextSwitchTo(), CopyMultiInsertInfo::mycid, NIL, CopyMultiInsertBuffer::nused, pgstat_progress_update_param(), PROGRESS_COPY_TUPLES_PROCESSED, RelationGetRelid, CopyFromStateData::relname_only, CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_BatchSize, ResultRelInfo::ri_FdwRoutine, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, CopyMultiInsertBuffer::slots, table_multi_insert(), CopyMultiInsertInfo::ti_options, CopyFromStateData::transition_capture, TriggerDesc::trig_insert_after_row, TriggerDesc::trig_insert_new_table, and TupleTableSlot::tts_tableOid.

Referenced by CopyMultiInsertInfoFlush().

◆ CopyMultiInsertBufferInit()

static CopyMultiInsertBuffer * CopyMultiInsertBufferInit ( ResultRelInfo rri)
static

Definition at line 363 of file copyfrom.c.

364{
365 CopyMultiInsertBuffer *buffer;
366
368 memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
369 buffer->resultRelInfo = rri;
370 buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
371 buffer->nused = 0;
372
373 return buffer;
374}

References CopyMultiInsertBuffer::bistate, GetBulkInsertState(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, palloc(), CopyMultiInsertBuffer::resultRelInfo, ResultRelInfo::ri_FdwRoutine, and CopyMultiInsertBuffer::slots.

Referenced by CopyMultiInsertInfoSetupBuffer().

◆ CopyMultiInsertInfoCleanup()

static void CopyMultiInsertInfoCleanup ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 716 of file copyfrom.c.

717{
718 ListCell *lc;
719
720 foreach(lc, miinfo->multiInsertBuffers)
722
724}
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
Definition: copyfrom.c:620
#define lfirst(lc)
Definition: pg_list.h:172
List * multiInsertBuffers
Definition: copyfrom.c:96

References CopyMultiInsertBufferCleanup(), lfirst, list_free(), and CopyMultiInsertInfo::multiInsertBuffers.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoFlush()

static void CopyMultiInsertInfoFlush ( CopyMultiInsertInfo miinfo,
ResultRelInfo curr_rri,
int64 processed 
)
inlinestatic

Definition at line 662 of file copyfrom.c.

664{
665 ListCell *lc;
666
667 foreach(lc, miinfo->multiInsertBuffers)
668 {
670
671 CopyMultiInsertBufferFlush(miinfo, buffer, processed);
672 }
673
674 miinfo->bufferedTuples = 0;
675 miinfo->bufferedBytes = 0;
676
677 /*
678 * Trim the list of tracked buffers down if it exceeds the limit. Here we
679 * remove buffers starting with the ones we created first. It seems less
680 * likely that these older ones will be needed than the ones that were
681 * just created.
682 */
684 {
685 CopyMultiInsertBuffer *buffer;
686
688
689 /*
690 * We never want to remove the buffer that's currently being used, so
691 * if we happen to find that then move it to the end of the list.
692 */
693 if (buffer->resultRelInfo == curr_rri)
694 {
695 /*
696 * The code below would misbehave if we were trying to reduce the
697 * list to less than two items.
698 */
700 "MAX_PARTITION_BUFFERS must be >= 2");
701
703 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
705 }
706
707 CopyMultiInsertBufferCleanup(miinfo, buffer);
709 }
710}
#define StaticAssertDecl(condition, errmessage)
Definition: c.h:907
#define MAX_PARTITION_BUFFERS
Definition: copyfrom.c:75
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
Definition: copyfrom.c:446
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_delete_first(List *list)
Definition: list.c:943
#define linitial(l)
Definition: pg_list.h:178

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBufferCleanup(), CopyMultiInsertBufferFlush(), lappend(), lfirst, linitial, list_delete_first(), list_length(), MAX_PARTITION_BUFFERS, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertBuffer::resultRelInfo, and StaticAssertDecl.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoInit()

static void CopyMultiInsertInfoInit ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
CopyFromState  cstate,
EState estate,
CommandId  mycid,
int  ti_options 
)
static

Definition at line 400 of file copyfrom.c.

403{
404 miinfo->multiInsertBuffers = NIL;
405 miinfo->bufferedTuples = 0;
406 miinfo->bufferedBytes = 0;
407 miinfo->cstate = cstate;
408 miinfo->estate = estate;
409 miinfo->mycid = mycid;
410 miinfo->ti_options = ti_options;
411
412 /*
413 * Only setup the buffer when not dealing with a partitioned table.
414 * Buffers for partitioned tables will just be setup when we need to send
415 * tuples their way for the first time.
416 */
417 if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
419}

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertInfoSetupBuffer(), CopyMultiInsertInfo::cstate, CopyMultiInsertInfo::estate, CopyMultiInsertInfo::multiInsertBuffers, CopyMultiInsertInfo::mycid, NIL, RelationData::rd_rel, ResultRelInfo::ri_RelationDesc, and CopyMultiInsertInfo::ti_options.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsEmpty()

static bool CopyMultiInsertInfoIsEmpty ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 437 of file copyfrom.c.

438{
439 return miinfo->bufferedTuples == 0;
440}

References CopyMultiInsertInfo::bufferedTuples.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoIsFull()

static bool CopyMultiInsertInfoIsFull ( CopyMultiInsertInfo miinfo)
inlinestatic

Definition at line 425 of file copyfrom.c.

426{
427 if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
429 return true;
430 return false;
431}
#define MAX_BUFFERED_BYTES
Definition: copyfrom.c:69

References CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, MAX_BUFFERED_BYTES, and MAX_BUFFERED_TUPLES.

Referenced by CopyFrom().

◆ CopyMultiInsertInfoNextFreeSlot()

static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 735 of file copyfrom.c.

737{
739 int nused;
740
741 Assert(buffer != NULL);
743
744 nused = buffer->nused;
745
746 if (buffer->slots[nused] == NULL)
747 buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
748 return buffer->slots[nused];
749}

References Assert(), MAX_BUFFERED_TUPLES, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, ResultRelInfo::ri_RelationDesc, CopyMultiInsertBuffer::slots, and table_slot_create().

Referenced by CopyFrom().

◆ CopyMultiInsertInfoSetupBuffer()

static void CopyMultiInsertInfoSetupBuffer ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri 
)
inlinestatic

Definition at line 380 of file copyfrom.c.

382{
383 CopyMultiInsertBuffer *buffer;
384
385 buffer = CopyMultiInsertBufferInit(rri);
386
387 /* Setup back-link so we can easily find this buffer again */
388 rri->ri_CopyMultiInsertBuffer = buffer;
389 /* Record that we're tracking this buffer */
390 miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
391}
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
Definition: copyfrom.c:363

References CopyMultiInsertBufferInit(), lappend(), CopyMultiInsertInfo::multiInsertBuffers, and ResultRelInfo::ri_CopyMultiInsertBuffer.

Referenced by CopyFrom(), and CopyMultiInsertInfoInit().

◆ CopyMultiInsertInfoStore()

static void CopyMultiInsertInfoStore ( CopyMultiInsertInfo miinfo,
ResultRelInfo rri,
TupleTableSlot slot,
int  tuplen,
uint64  lineno 
)
inlinestatic

Definition at line 756 of file copyfrom.c.

758{
760
761 Assert(buffer != NULL);
762 Assert(slot == buffer->slots[buffer->nused]);
763
764 /* Store the line number so we can properly report any errors later */
765 buffer->linenos[buffer->nused] = lineno;
766
767 /* Record this slot as being used */
768 buffer->nused++;
769
770 /* Update how many tuples are stored and their size */
771 miinfo->bufferedTuples++;
772 miinfo->bufferedBytes += tuplen;
773}

References Assert(), CopyMultiInsertInfo::bufferedBytes, CopyMultiInsertInfo::bufferedTuples, CopyMultiInsertBuffer::linenos, CopyMultiInsertBuffer::nused, ResultRelInfo::ri_CopyMultiInsertBuffer, and CopyMultiInsertBuffer::slots.

Referenced by CopyFrom().

◆ EndCopyFrom()

void EndCopyFrom ( CopyFromState  cstate)

Definition at line 1914 of file copyfrom.c.

1915{
1916 /* Invoke the end callback */
1917 cstate->routine->CopyFromEnd(cstate);
1918
1919 /* No COPY FROM related resources except memory. */
1920 if (cstate->is_program)
1921 {
1922 ClosePipeFromProgram(cstate);
1923 }
1924 else
1925 {
1926 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1927 ereport(ERROR,
1929 errmsg("could not close file \"%s\": %m",
1930 cstate->filename)));
1931 }
1932
1934
1936 pfree(cstate);
1937}
void pgstat_progress_end_command(void)
static void ClosePipeFromProgram(CopyFromState cstate)
Definition: copyfrom.c:1943
int FreeFile(FILE *file)
Definition: fd.c:2803
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void(* CopyFromEnd)(CopyFromState cstate)
Definition: copyapi.h:102

References ClosePipeFromProgram(), CopyFromStateData::copy_file, CopyFromStateData::copycontext, CopyFromRoutine::CopyFromEnd, ereport, errcode_for_file_access(), errmsg(), ERROR, CopyFromStateData::filename, FreeFile(), CopyFromStateData::is_program, MemoryContextDelete(), pfree(), pgstat_progress_end_command(), and CopyFromStateData::routine.

Referenced by DoCopy(), file_acquire_sample_rows(), fileEndForeignScan(), and fileReScanForeignScan().

Variable Documentation

◆ CopyFromRoutineBinary

const CopyFromRoutine CopyFromRoutineBinary
static
Initial value:
= {
.CopyFromInFunc = CopyFromBinaryInFunc,
.CopyFromStart = CopyFromBinaryStart,
.CopyFromOneRow = CopyFromBinaryOneRow,
.CopyFromEnd = CopyFromBinaryEnd,
}
static void CopyFromBinaryEnd(CopyFromState cstate)
Definition: copyfrom.c:243
static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
Definition: copyfrom.c:221
static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
Definition: copyfrom.c:232
bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)

Definition at line 147 of file copyfrom.c.

Referenced by CopyFromGetRoutine().

◆ CopyFromRoutineCSV

const CopyFromRoutine CopyFromRoutineCSV
static
Initial value:
= {
.CopyFromInFunc = CopyFromTextLikeInFunc,
.CopyFromStart = CopyFromTextLikeStart,
.CopyFromOneRow = CopyFromCSVOneRow,
.CopyFromEnd = CopyFromTextLikeEnd,
}
static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
Definition: copyfrom.c:169
static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
Definition: copyfrom.c:203
static void CopyFromTextLikeEnd(CopyFromState cstate)
Definition: copyfrom.c:214
bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)

Definition at line 139 of file copyfrom.c.

Referenced by CopyFromGetRoutine().

◆ CopyFromRoutineText

const CopyFromRoutine CopyFromRoutineText
static
Initial value:
= {
.CopyFromInFunc = CopyFromTextLikeInFunc,
.CopyFromStart = CopyFromTextLikeStart,
.CopyFromOneRow = CopyFromTextOneRow,
.CopyFromEnd = CopyFromTextLikeEnd,
}
bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)

Definition at line 131 of file copyfrom.c.

Referenced by CopyFromGetRoutine().