58 #define MAX_BUFFERED_TUPLES 1000 64 #define MAX_BUFFERED_BYTES 65535 67 #define MAX_PARTITION_BUFFERS 32 111 char curlineno_str[32];
135 errcontext(
"COPY %s, line %s, column %s: \"%s\"",
143 errcontext(
"COPY %s, line %s, column %s: null input",
186 #define MAX_COPY_DATA_DISPLAY 100 188 int slen = strlen(str);
202 res = (
char *)
palloc(len + 4);
203 memcpy(res, str, len);
204 strcpy(res + len,
"...");
260 miinfo->
mycid = mycid;
302 uint64 save_cur_lineno;
332 for (i = 0; i <
nused; i++)
340 List *recheckIndexes;
345 buffer->
slots[i], estate,
false,
false,
348 slots[i], recheckIndexes,
491 if (buffer->
slots[nused] == NULL)
542 uint64 processed = 0;
543 bool has_before_insert_row_trig;
544 bool has_instead_insert_row_trig;
545 bool leafpart_use_multi_insert =
false;
555 if (cstate->
rel->
rd_rel->relkind != RELKIND_RELATION &&
556 cstate->
rel->
rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
557 cstate->
rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
561 if (cstate->
rel->
rd_rel->relkind == RELKIND_VIEW)
563 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
564 errmsg(
"cannot copy to view \"%s\"",
566 errhint(
"To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
567 else if (cstate->
rel->
rd_rel->relkind == RELKIND_MATVIEW)
569 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
570 errmsg(
"cannot copy to materialized view \"%s\"",
572 else if (cstate->
rel->
rd_rel->relkind == RELKIND_SEQUENCE)
574 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
575 errmsg(
"cannot copy to sequence \"%s\"",
579 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
580 errmsg(
"cannot copy to non-table relation \"%s\"",
589 if (RELKIND_HAS_STORAGE(cstate->
rel->
rd_rel->relkind) &&
616 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
619 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
620 errmsg(
"cannot perform COPY FREEZE on a partitioned table")));
634 (
errcode(ERRCODE_INVALID_TRANSACTION_STATE),
635 errmsg(
"cannot perform COPY FREEZE because of prior transaction activity")));
640 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
641 errmsg(
"cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
695 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
721 else if (proute != NULL && resultRelInfo->
ri_TrigDesc != NULL &&
777 estate, mycid, ti_options);
793 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
796 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
811 errcallback.
arg = (
void *) cstate;
836 Assert(resultRelInfo == target_resultRelInfo);
885 proute, myslot, estate);
887 if (prevResultRelInfo != resultRelInfo)
890 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
893 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
901 !has_before_insert_row_trig &&
902 !has_instead_insert_row_trig &&
906 if (leafpart_use_multi_insert)
936 !has_before_insert_row_trig ? myslot : NULL;
943 if (insertMethod ==
CIM_SINGLE || !leafpart_use_multi_insert)
992 if (has_before_insert_row_trig)
1005 if (has_instead_insert_row_trig)
1032 (proute == NULL || has_before_insert_row_trig))
1036 if (insertMethod ==
CIM_MULTI || leafpart_use_multi_insert)
1046 resultRelInfo, myslot,
1083 myslot, mycid, ti_options, bistate);
1123 if (bistate != NULL)
1147 target_resultRelInfo);
1190 bool pipe = (filename == NULL);
1201 bool volatile_defexprs;
1229 num_phys_attrs = tupDesc->
natts;
1240 foreach(cur, attnums)
1247 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1248 errmsg(
"FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1263 foreach(cur, attnums)
1270 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1271 errmsg(
"FORCE_NULL column \"%s\" not referenced by COPY",
1287 foreach(cur, attnums)
1294 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1353 num_phys_attrs = tupDesc->
natts;
1355 volatile_defexprs =
false;
1364 typioparams = (
Oid *)
palloc(num_phys_attrs *
sizeof(
Oid));
1365 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
1368 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1373 if (att->attisdropped)
1379 &in_func_oid, &typioparams[attnum - 1]);
1382 &in_func_oid, &typioparams[attnum - 1]);
1383 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1393 if (defexpr != NULL)
1400 defmap[num_defaults] = attnum - 1;
1416 if (!volatile_defexprs)
1460 errmsg(
"could not execute command \"%s\": %m",
1471 int save_errno = errno;
1475 errmsg(
"could not open file \"%s\" for reading: %m",
1477 (save_errno == ENOENT || save_errno == EACCES) ?
1478 errhint(
"COPY FROM instructs the PostgreSQL server process to read a file. " 1479 "You may want a client-side facility such as psql's \\copy.") : 0));
1485 errmsg(
"could not stat file \"%s\": %m",
1490 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
1533 errmsg(
"could not close file \"%s\": %m",
1554 if (pclose_rc == -1)
1557 errmsg(
"could not close pipe to external command: %m")));
1558 else if (pclose_rc != 0)
1571 (
errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1572 errmsg(
"program \"%s\" failed",
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
copy_data_source_cb data_source_cb
void ExecInitRangeTable(EState *estate, List *rangeTable)
void ReceiveCopyBegin(CopyFromState cstate)
#define MAX_COPY_DATA_DISPLAY
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
bool contain_volatile_functions_not_nextval(Node *clause)
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
int errhint(const char *fmt,...)
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
struct CopyMultiInsertBuffer CopyMultiInsertBuffer
StringInfoData attribute_buf
#define ResetPerTupleExprContext(estate)
#define RelationGetDescr(relation)
#define TABLE_INSERT_FROZEN
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
#define castNode(_type_, nodeptr)
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
BeginForeignInsert_function BeginForeignInsert
ResultRelInfo * resultRelInfo
#define TupleDescAttr(tupdesc, i)
void ExecCloseResultRelations(EState *estate)
uint64 CopyFrom(CopyFromState cstate)
ExecForeignInsert_function ExecForeignInsert
void pgstat_progress_update_param(int index, int64 val)
char * pstrdup(const char *in)
Expr * expression_planner(Expr *expr)
uint64 linenos[MAX_BUFFERED_TUPLES]
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
struct CopyMultiInsertInfo CopyMultiInsertInfo
bool ThereAreNoPriorRegisteredSnapshots(void)
int errcode(int sqlerrcode)
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
SubTransactionId rd_newRelfilenodeSubid
static void table_finish_bulk_insert(Relation rel, int options)
bool contain_volatile_functions(Node *clause)
static bool ExecQual(ExprState *state, ExprContext *econtext)
void(* callback)(void *arg)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes)
struct ErrorContextCallback * previous
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
ResultRelInfo * resultRelInfo
char * wait_result_to_str(int exitstatus)
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
MemoryContext copycontext
ExprState * ExecInitQual(List *qual, PlanState *parent)
void ExecCloseRangeTableRelations(EState *estate)
int ClosePipeStream(FILE *file)
int errdetail_internal(const char *fmt,...)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
static char * limit_printout_length(const char *str)
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
ErrorContextCallback * error_context_stack
bool trig_insert_instead_row
void FreeExecutorState(EState *estate)
#define GetPerTupleExprContext(estate)
BulkInsertState GetBulkInsertState(void)
bool trig_insert_new_table
bool has_generated_stored
void CopyFromErrorCallback(void *arg)
void pfree(void *pointer)
bool wait_result_is_signal(int exit_status, int signum)
bool ThereAreNoReadyPortals(void)
#define MAX_PARTITION_BUFFERS
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
TupleTableSlot * ri_PartitionTupleSlot
int pg_mbcliplen(const char *mbstr, int len, int limit)
#define ALLOCSET_DEFAULT_SIZES
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation)
bool list_member_int(const List *list, int datum)
static void ClosePipeFromProgram(CopyFromState cstate)
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
struct TransitionCaptureState * mt_transition_capture
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
#define PROGRESS_COPY_BYTES_TOTAL
int errcode_for_file_access(void)
FILE * AllocateFile(const char *name, const char *mode)
#define RelationGetRelationName(relation)
FormData_pg_attribute * Form_pg_attribute
struct FdwRoutine * ri_FdwRoutine
bool trig_insert_after_row
MemoryContext CurrentMemoryContext
#define MAX_BUFFERED_BYTES
FILE * OpenPipeStream(const char *command, const char *mode)
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, ModifyTableState *mtstate, Relation rel)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
void InvalidateCatalogSnapshot(void)
TriggerDesc * ri_TrigDesc
EState * CreateExecutorState(void)
List * lappend(List *list, void *datum)
TransitionCaptureState * transition_capture
void initStringInfo(StringInfo str)
SubTransactionId rd_createSubid
Node * build_column_default(Relation rel, int attrno)
bool trig_insert_before_row
SubTransactionId rd_firstRelfilenodeSubid
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
void * palloc0(Size size)
void pgstat_progress_end_command(void)
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
int GetDatabaseEncoding(void)
int pg_get_client_encoding(void)
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
#define TABLE_INSERT_SKIP_FSM
static void ExecMaterializeSlot(TupleTableSlot *slot)
bool * convert_select_flags
#define ereport(elevel,...)
void AfterTriggerBeginQuery(void)
int errmsg_internal(const char *fmt,...)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
#define Assert(condition)
void FreeBulkInsertState(BulkInsertState bistate)
SubTransactionId GetCurrentSubTransactionId(void)
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
static int list_length(const List *l)
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
TupleTableSlot * ecxt_scantuple
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
#define InvalidSubTransactionId
void ReceiveCopyBinaryHeader(CopyFromState cstate)
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
struct CopyFromStateData * CopyFromState
void EndCopyFrom(CopyFromState cstate)
int pg_database_encoding_max_length(void)
#define GetPerTupleMemoryContext(estate)
int(* copy_data_source_cb)(void *outbuf, int minread, int maxread)
#define PG_ENCODING_IS_CLIENT_ONLY(_enc)
void AfterTriggerEndQuery(EState *estate)
TupleConversionMap * ri_RootToPartitionMap
int errmsg(const char *fmt,...)
void list_free(List *list)
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
List * multiInsertBuffers
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
#define CHECK_FOR_INTERRUPTS()
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
CommandId GetCurrentCommandId(bool used)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
CommandDest whereToSendOutput
#define RelationGetRelid(relation)
bool encoding_embeds_ascii
#define PROGRESS_COPY_LINES_PROCESSED
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
List * list_delete_first(List *list)
EndForeignInsert_function EndForeignInsert
#define MAX_BUFFERED_TUPLES
TupleTableSlot * tcs_original_insert_tuple