63#define MAX_BUFFERED_TUPLES 1000
69#define MAX_BUFFERED_BYTES 65535
75#define MAX_PARTITION_BUFFERS 32
160 else if (
opts->binary)
268 errcontext(
"COPY %s, line %" PRIu64
", column %s",
285 errcontext(
"COPY %s, line %" PRIu64
", column %s: \"%s\"",
295 errcontext(
"COPY %s, line %" PRIu64
", column %s: null input",
312 errcontext(
"COPY %s, line %" PRIu64
": \"%s\"",
335#define MAX_COPY_DATA_DISPLAY 100
337 int slen = strlen(
str);
353 strcpy(res +
len,
"...");
409 miinfo->
mycid = mycid;
452 int nused = buffer->
nused;
477 int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
505 for (
i = 0;
i < inserted;
i++)
522 *processed += inserted;
527 for (
i = 0;
i < nused;
i++)
562 for (
i = 0;
i < nused;
i++)
570 List *recheckIndexes;
575 buffer->
slots[
i], estate,
false,
576 false, NULL,
NIL,
false);
578 slots[
i], recheckIndexes,
700 "MAX_PARTITION_BUFFERS must be >= 2");
744 nused = buffer->
nused;
746 if (buffer->
slots[nused] == NULL)
748 return buffer->
slots[nused];
799 bool has_before_insert_row_trig;
800 bool has_instead_insert_row_trig;
801 bool leafpart_use_multi_insert =
false;
814 if (cstate->
rel->
rd_rel->relkind != RELKIND_RELATION &&
815 cstate->
rel->
rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
816 cstate->
rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
820 if (cstate->
rel->
rd_rel->relkind == RELKIND_VIEW)
822 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
823 errmsg(
"cannot copy to view \"%s\"",
825 errhint(
"To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
826 else if (cstate->
rel->
rd_rel->relkind == RELKIND_MATVIEW)
828 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
829 errmsg(
"cannot copy to materialized view \"%s\"",
831 else if (cstate->
rel->
rd_rel->relkind == RELKIND_SEQUENCE)
833 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
834 errmsg(
"cannot copy to sequence \"%s\"",
838 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
839 errmsg(
"cannot copy to non-table relation \"%s\"",
848 if (RELKIND_HAS_STORAGE(cstate->
rel->
rd_rel->relkind) &&
875 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
878 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
879 errmsg(
"cannot perform COPY FREEZE on a partitioned table")));
883 if (cstate->
rel->
rd_rel->relkind == RELKIND_FOREIGN_TABLE)
885 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886 errmsg(
"cannot perform COPY FREEZE on a foreign table")));
899 (
errcode(ERRCODE_INVALID_TRANSACTION_STATE),
900 errmsg(
"cannot perform COPY FREEZE because of prior transaction activity")));
905 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906 errmsg(
"cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
980 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1016 else if (proute != NULL && resultRelInfo->
ri_TrigDesc != NULL &&
1074 estate, mycid, ti_options);
1090 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1093 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1108 errcallback.
arg = cstate;
1128 myslot = singleslot;
1133 Assert(resultRelInfo == target_resultRelInfo);
1170 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1171 errmsg(
"skipped more than REJECT_LIMIT (%" PRId64
") rows due to data type incompatibility",
1216 proute, myslot, estate);
1218 if (prevResultRelInfo != resultRelInfo)
1221 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1224 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1233 !has_before_insert_row_trig &&
1234 !has_instead_insert_row_trig &&
1239 if (leafpart_use_multi_insert)
1257 if (bistate != NULL)
1259 prevResultRelInfo = resultRelInfo;
1271 !has_before_insert_row_trig ? myslot : NULL;
1278 if (insertMethod ==
CIM_SINGLE || !leafpart_use_multi_insert)
1327 if (has_before_insert_row_trig)
1340 if (has_instead_insert_row_trig)
1367 (proute == NULL || has_before_insert_row_trig))
1371 if (insertMethod ==
CIM_MULTI || leafpart_use_multi_insert)
1381 resultRelInfo, myslot,
1428 myslot, mycid, ti_options, bistate);
1474 errmsg_plural(
"%" PRIu64
" row was skipped due to data type incompatibility",
1475 "%" PRIu64
" rows were skipped due to data type incompatibility",
1479 if (bistate != NULL)
1496 target_resultRelInfo);
1548 bool volatile_defexprs;
1549 const int progress_cols[] = {
1554 int64 progress_vals[] = {
1589 num_phys_attrs = tupDesc->
natts;
1602 foreach(
cur, attnums)
1609 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1611 errmsg(
"%s column \"%s\" not referenced by COPY",
1612 "FORCE_NOT_NULL",
NameStr(attr->attname))));
1645 foreach(
cur, attnums)
1652 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1654 errmsg(
"%s column \"%s\" not referenced by COPY",
1655 "FORCE_NULL",
NameStr(attr->attname))));
1670 foreach(
cur, attnums)
1677 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1706 (
errcode(ERRCODE_UNDEFINED_FUNCTION),
1707 errmsg(
"default conversion function for encoding \"%s\" to \"%s\" does not exist",
1744 volatile_defexprs =
false;
1753 typioparams = (
Oid *)
palloc(num_phys_attrs *
sizeof(
Oid));
1754 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
1762 if (att->attisdropped)
1767 &in_functions[
attnum - 1],
1768 &typioparams[
attnum - 1]);
1771 defexprs[
attnum - 1] = NULL;
1785 if (defexpr != NULL)
1797 defmap[num_defaults] =
attnum - 1;
1815 if (!volatile_defexprs)
1863 errmsg(
"could not execute command \"%s\": %m",
1875 int save_errno = errno;
1879 errmsg(
"could not open file \"%s\" for reading: %m",
1881 (save_errno == ENOENT || save_errno == EACCES) ?
1882 errhint(
"COPY FROM instructs the PostgreSQL server process to read a file. "
1883 "You may want a client-side facility such as psql's \\copy.") : 0));
1889 errmsg(
"could not stat file \"%s\": %m",
1894 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
1897 progress_vals[2] = st.
st_size;
1929 errmsg(
"could not close file \"%s\": %m",
1950 if (pclose_rc == -1)
1953 errmsg(
"could not close pipe to external command: %m")));
1954 else if (pclose_rc != 0)
1967 (
errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1968 errmsg(
"program \"%s\" failed",
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
void pgstat_progress_end_command(void)
Bitmapset * bms_make_singleton(int x)
#define InvalidSubTransactionId
#define MemSet(start, val, len)
#define StaticAssertDecl(condition, errmessage)
#define OidIsValid(objectId)
bool contain_volatile_functions_not_nextval(Node *clause)
bool contain_volatile_functions(Node *clause)
static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
char * CopyLimitPrintoutLength(const char *str)
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
static const CopyFromRoutine * CopyFromGetRoutine(const CopyFormatOptions *opts)
static const CopyFromRoutine CopyFromRoutineBinary
static const CopyFromRoutine CopyFromRoutineText
static void CopyFromBinaryEnd(CopyFromState cstate)
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
#define MAX_PARTITION_BUFFERS
struct CopyMultiInsertInfo CopyMultiInsertInfo
static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
#define MAX_BUFFERED_TUPLES
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
static void ClosePipeFromProgram(CopyFromState cstate)
#define MAX_BUFFERED_BYTES
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
static void CopyFromTextLikeEnd(CopyFromState cstate)
uint64 CopyFrom(CopyFromState cstate)
void EndCopyFrom(CopyFromState cstate)
static const CopyFromRoutine CopyFromRoutineCSV
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
struct CopyMultiInsertBuffer CopyMultiInsertBuffer
#define MAX_COPY_DATA_DISPLAY
void CopyFromErrorCallback(void *arg)
bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
void ReceiveCopyBinaryHeader(CopyFromState cstate)
void ReceiveCopyBegin(CopyFromState cstate)
bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
ExprState * ExecInitQual(List *qual, PlanState *parent)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, List *mergeActions)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void ExecCloseResultRelations(EState *estate)
void ExecCloseRangeTableRelations(EState *estate)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
void FreeExecutorState(EState *estate)
EState * CreateExecutorState(void)
#define ResetPerTupleExprContext(estate)
#define GetPerTupleExprContext(estate)
#define GetPerTupleMemoryContext(estate)
static bool ExecQual(ExprState *state, ExprContext *econtext)
FILE * OpenPipeStream(const char *command, const char *mode)
int ClosePipeStream(FILE *file)
FILE * AllocateFile(const char *name, const char *mode)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Assert(PointerIsAligned(start, uint64))
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
BulkInsertState GetBulkInsertState(void)
void FreeBulkInsertState(BulkInsertState bistate)
@ COPY_LOG_VERBOSITY_DEFAULT
struct CopyFromStateData * CopyFromState
int(* copy_data_source_cb)(void *outbuf, int minread, int maxread)
List * lappend(List *list, void *datum)
List * list_delete_first(List *list)
void list_free(List *list)
bool list_member_int(const List *list, int datum)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
int GetDatabaseEncoding(void)
int pg_mbcliplen(const char *mbstr, int len, int limit)
int pg_get_client_encoding(void)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding)
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
#define castNode(_type_, nodeptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static AmcheckOptions opts
FormData_pg_attribute * Form_pg_attribute
static int list_length(const List *l)
#define pg_encoding_to_char
Expr * expression_planner(Expr *expr)
bool ThereAreNoReadyPortals(void)
CommandDest whereToSendOutput
#define PROGRESS_COPY_COMMAND
#define PROGRESS_COPY_TYPE_FILE
#define PROGRESS_COPY_COMMAND_FROM
#define PROGRESS_COPY_TUPLES_PROCESSED
#define PROGRESS_COPY_TUPLES_EXCLUDED
#define PROGRESS_COPY_TUPLES_SKIPPED
#define PROGRESS_COPY_TYPE
#define PROGRESS_COPY_TYPE_PROGRAM
#define PROGRESS_COPY_BYTES_TOTAL
#define PROGRESS_COPY_TYPE_CALLBACK
#define PROGRESS_COPY_TYPE_PIPE
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
Node * build_column_default(Relation rel, int attrno)
bool ThereAreNoPriorRegisteredSnapshots(void)
void InvalidateCatalogSnapshot(void)
void initStringInfo(StringInfo str)
void(* CopyFromEnd)(CopyFromState cstate)
void(* CopyFromInFunc)(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, Oid *typioparam)
void(* CopyFromStart)(CopyFromState cstate, TupleDesc tupDesc)
copy_data_source_cb data_source_cb
const struct CopyFromRoutine * routine
StringInfoData attribute_buf
bool * convert_select_flags
TransitionCaptureState * transition_capture
MemoryContext copycontext
ErrorSaveContext * escontext
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
ResultRelInfo * resultRelInfo
uint64 linenos[MAX_BUFFERED_TUPLES]
List * multiInsertBuffers
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TupleTableSlot * ecxt_scantuple
EndForeignInsert_function EndForeignInsert
BeginForeignInsert_function BeginForeignInsert
ExecForeignInsert_function ExecForeignInsert
ExecForeignBatchInsert_function ExecForeignBatchInsert
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
ResultRelInfo * resultRelInfo
ResultRelInfo * rootResultRelInfo
struct TransitionCaptureState * mt_transition_capture
SubTransactionId rd_firstRelfilelocatorSubid
SubTransactionId rd_newRelfilelocatorSubid
SubTransactionId rd_createSubid
TupleTableSlot * ri_PartitionTupleSlot
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
TriggerDesc * ri_TrigDesc
struct FdwRoutine * ri_FdwRoutine
TupleTableSlot * tcs_original_insert_tuple
bool trig_insert_instead_row
bool trig_insert_after_row
bool trig_insert_new_table
bool trig_insert_before_row
bool has_generated_stored
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
#define TABLE_INSERT_FROZEN
#define TABLE_INSERT_SKIP_FSM
static void table_finish_bulk_insert(Relation rel, int options)
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
void AfterTriggerEndQuery(EState *estate)
void AfterTriggerBeginQuery(void)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static void ExecMaterializeSlot(TupleTableSlot *slot)
char * wait_result_to_str(int exitstatus)
bool wait_result_is_signal(int exit_status, int signum)
SubTransactionId GetCurrentSubTransactionId(void)
CommandId GetCurrentCommandId(bool used)