63 #define MAX_BUFFERED_TUPLES 1000
69 #define MAX_BUFFERED_BYTES 65535
75 #define MAX_PARTITION_BUFFERS 32
146 errcontext(
"COPY %s, line %llu, column %s: \"%s\"",
156 errcontext(
"COPY %s, line %llu, column %s: null input",
175 (
unsigned long long) cstate->
cur_lineno, lineval);
196 #define MAX_COPY_DATA_DISPLAY 100
198 int slen = strlen(
str);
270 miinfo->
mycid = mycid;
313 int nused = buffer->
nused;
338 int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
366 for (
i = 0;
i < inserted;
i++)
383 *processed += inserted;
388 for (
i = 0;
i < nused;
i++)
423 for (
i = 0;
i < nused;
i++)
431 List *recheckIndexes;
436 buffer->
slots[
i], estate,
false,
437 false, NULL,
NIL,
false);
439 slots[
i], recheckIndexes,
561 "MAX_PARTITION_BUFFERS must be >= 2");
605 nused = buffer->
nused;
607 if (buffer->
slots[nused] == NULL)
609 return buffer->
slots[nused];
660 bool has_before_insert_row_trig;
661 bool has_instead_insert_row_trig;
662 bool leafpart_use_multi_insert =
false;
675 if (cstate->
rel->
rd_rel->relkind != RELKIND_RELATION &&
676 cstate->
rel->
rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
677 cstate->
rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
681 if (cstate->
rel->
rd_rel->relkind == RELKIND_VIEW)
683 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 errmsg(
"cannot copy to view \"%s\"",
686 errhint(
"To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
687 else if (cstate->
rel->
rd_rel->relkind == RELKIND_MATVIEW)
689 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
690 errmsg(
"cannot copy to materialized view \"%s\"",
692 else if (cstate->
rel->
rd_rel->relkind == RELKIND_SEQUENCE)
694 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
695 errmsg(
"cannot copy to sequence \"%s\"",
699 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
700 errmsg(
"cannot copy to non-table relation \"%s\"",
709 if (RELKIND_HAS_STORAGE(cstate->
rel->
rd_rel->relkind) &&
736 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
739 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
740 errmsg(
"cannot perform COPY FREEZE on a partitioned table")));
754 (
errcode(ERRCODE_INVALID_TRANSACTION_STATE),
755 errmsg(
"cannot perform COPY FREEZE because of prior transaction activity")));
760 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
761 errmsg(
"cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
834 if (cstate->
rel->
rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
870 else if (proute != NULL && resultRelInfo->
ri_TrigDesc != NULL &&
928 estate, mycid, ti_options);
944 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
947 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
962 errcallback.
arg = (
void *) cstate;
987 Assert(resultRelInfo == target_resultRelInfo);
1024 (
errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
1025 errmsg(
"skipped more than REJECT_LIMIT (%lld) rows due to data type incompatibility",
1070 proute, myslot, estate);
1072 if (prevResultRelInfo != resultRelInfo)
1075 has_before_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1078 has_instead_insert_row_trig = (resultRelInfo->
ri_TrigDesc &&
1087 !has_before_insert_row_trig &&
1088 !has_instead_insert_row_trig &&
1093 if (leafpart_use_multi_insert)
1111 if (bistate != NULL)
1113 prevResultRelInfo = resultRelInfo;
1125 !has_before_insert_row_trig ? myslot : NULL;
1132 if (insertMethod ==
CIM_SINGLE || !leafpart_use_multi_insert)
1181 if (has_before_insert_row_trig)
1194 if (has_instead_insert_row_trig)
1221 (proute == NULL || has_before_insert_row_trig))
1225 if (insertMethod ==
CIM_MULTI || leafpart_use_multi_insert)
1235 resultRelInfo, myslot,
1282 myslot, mycid, ti_options, bistate);
1328 errmsg_plural(
"%llu row was skipped due to data type incompatibility",
1329 "%llu rows were skipped due to data type incompatibility",
1333 if (bistate != NULL)
1350 target_resultRelInfo);
1403 bool volatile_defexprs;
1404 const int progress_cols[] = {
1409 int64 progress_vals[] = {
1441 num_phys_attrs = tupDesc->
natts;
1454 foreach(
cur, attnums)
1461 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1463 errmsg(
"%s column \"%s\" not referenced by COPY",
1464 "FORCE_NOT_NULL",
NameStr(attr->attname))));
1497 foreach(
cur, attnums)
1504 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1506 errmsg(
"%s column \"%s\" not referenced by COPY",
1507 "FORCE_NULL",
NameStr(attr->attname))));
1522 foreach(
cur, attnums)
1529 (
errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1558 (
errcode(ERRCODE_UNDEFINED_FUNCTION),
1559 errmsg(
"default conversion function for encoding \"%s\" to \"%s\" does not exist",
1615 volatile_defexprs =
false;
1624 typioparams = (
Oid *)
palloc(num_phys_attrs *
sizeof(
Oid));
1625 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
1633 if (att->attisdropped)
1639 &in_func_oid, &typioparams[
attnum - 1]);
1642 &in_func_oid, &typioparams[
attnum - 1]);
1646 defexprs[
attnum - 1] = NULL;
1660 if (defexpr != NULL)
1672 defmap[num_defaults] =
attnum - 1;
1690 if (!volatile_defexprs)
1738 errmsg(
"could not execute command \"%s\": %m",
1750 int save_errno = errno;
1754 errmsg(
"could not open file \"%s\" for reading: %m",
1756 (save_errno == ENOENT || save_errno == EACCES) ?
1757 errhint(
"COPY FROM instructs the PostgreSQL server process to read a file. "
1758 "You may want a client-side facility such as psql's \\copy.") : 0));
1764 errmsg(
"could not stat file \"%s\": %m",
1769 (
errcode(ERRCODE_WRONG_OBJECT_TYPE),
1772 progress_vals[2] = st.
st_size;
1814 errmsg(
"could not close file \"%s\": %m",
1835 if (pclose_rc == -1)
1838 errmsg(
"could not close pipe to external command: %m")));
1839 else if (pclose_rc != 0)
1852 (
errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1853 errmsg(
"program \"%s\" failed",
List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)
void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
void pgstat_progress_update_param(int index, int64 val)
void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)
void pgstat_progress_end_command(void)
#define InvalidSubTransactionId
#define Assert(condition)
#define MemSet(start, val, len)
#define StaticAssertDecl(condition, errmessage)
#define OidIsValid(objectId)
bool contain_volatile_functions_not_nextval(Node *clause)
bool contain_volatile_functions(Node *clause)
static void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
static CopyMultiInsertBuffer * CopyMultiInsertBufferInit(ResultRelInfo *rri)
char * CopyLimitPrintoutLength(const char *str)
static void CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyFromState cstate, EState *estate, CommandId mycid, int ti_options)
static void CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, int64 *processed)
static void CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, TupleTableSlot *slot, int tuplen, uint64 lineno)
static void CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
#define MAX_PARTITION_BUFFERS
struct CopyMultiInsertInfo CopyMultiInsertInfo
#define MAX_BUFFERED_TUPLES
static bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
static void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer, int64 *processed)
static TupleTableSlot * CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri)
static void ClosePipeFromProgram(CopyFromState cstate)
#define MAX_BUFFERED_BYTES
static void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer)
uint64 CopyFrom(CopyFromState cstate)
void EndCopyFrom(CopyFromState cstate)
static bool CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
struct CopyMultiInsertBuffer CopyMultiInsertBuffer
#define MAX_COPY_DATA_DISPLAY
void CopyFromErrorCallback(void *arg)
void ReceiveCopyBinaryHeader(CopyFromState cstate)
void ReceiveCopyBegin(CopyFromState cstate)
bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
static void PGresult * res
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
ExprState * ExecInitQual(List *qual, PlanState *parent)
ExprState * ExecInitExpr(Expr *node, PlanState *parent)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
void ExecOpenIndices(ResultRelInfo *resultRelInfo, bool speculative)
void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, List *mergeActions)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void ExecCloseResultRelations(EState *estate)
void ExecCloseRangeTableRelations(EState *estate)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
ResultRelInfo * ExecFindPartition(ModifyTableState *mtstate, ResultRelInfo *rootResultRelInfo, PartitionTupleRouting *proute, TupleTableSlot *slot, EState *estate)
PartitionTupleRouting * ExecSetupPartitionTupleRouting(EState *estate, Relation rel)
void ExecCleanupTupleRouting(ModifyTableState *mtstate, PartitionTupleRouting *proute)
void ExecResetTupleTable(List *tupleTable, bool shouldFree)
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
void ExecInitResultRelation(EState *estate, ResultRelInfo *resultRelInfo, Index rti)
EState * CreateExecutorState(void)
TupleConversionMap * ExecGetRootToChildMap(ResultRelInfo *resultRelInfo, EState *estate)
void FreeExecutorState(EState *estate)
#define ResetPerTupleExprContext(estate)
#define GetPerTupleExprContext(estate)
#define GetPerTupleMemoryContext(estate)
static bool ExecQual(ExprState *state, ExprContext *econtext)
FILE * AllocateFile(const char *name, const char *mode)
int ClosePipeStream(FILE *file)
FILE * OpenPipeStream(const char *command, const char *mode)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
void ReleaseBulkInsertStatePin(BulkInsertState bistate)
BulkInsertState GetBulkInsertState(void)
void FreeBulkInsertState(BulkInsertState bistate)
@ COPY_LOG_VERBOSITY_DEFAULT
struct CopyFromStateData * CopyFromState
int(* copy_data_source_cb)(void *outbuf, int minread, int maxread)
List * lappend(List *list, void *datum)
List * list_delete_first(List *list)
void list_free(List *list)
bool list_member_int(const List *list, int datum)
void getTypeInputInfo(Oid type, Oid *typInput, Oid *typIOParam)
void getTypeBinaryInputInfo(Oid type, Oid *typReceive, Oid *typIOParam)
int GetDatabaseEncoding(void)
int pg_mbcliplen(const char *mbstr, int len, int limit)
int pg_get_client_encoding(void)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
Oid FindDefaultConversionProc(int32 for_encoding, int32 to_encoding)
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
#define castNode(_type_, nodeptr)
FormData_pg_attribute * Form_pg_attribute
static int list_length(const List *l)
#define pg_encoding_to_char
Expr * expression_planner(Expr *expr)
bool ThereAreNoReadyPortals(void)
CommandDest whereToSendOutput
#define PROGRESS_COPY_COMMAND
#define PROGRESS_COPY_TYPE_FILE
#define PROGRESS_COPY_COMMAND_FROM
#define PROGRESS_COPY_TUPLES_PROCESSED
#define PROGRESS_COPY_TUPLES_EXCLUDED
#define PROGRESS_COPY_TUPLES_SKIPPED
#define PROGRESS_COPY_TYPE
#define PROGRESS_COPY_TYPE_PROGRAM
#define PROGRESS_COPY_BYTES_TOTAL
#define PROGRESS_COPY_TYPE_CALLBACK
#define PROGRESS_COPY_TYPE_PIPE
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
Node * build_column_default(Relation rel, int attrno)
static pg_noinline void Size size
bool ThereAreNoPriorRegisteredSnapshots(void)
void InvalidateCatalogSnapshot(void)
void initStringInfo(StringInfo str)
copy_data_source_cb data_source_cb
StringInfoData attribute_buf
bool * convert_select_flags
TransitionCaptureState * transition_capture
MemoryContext copycontext
ErrorSaveContext * escontext
TupleTableSlot * slots[MAX_BUFFERED_TUPLES]
ResultRelInfo * resultRelInfo
uint64 linenos[MAX_BUFFERED_TUPLES]
List * multiInsertBuffers
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TupleTableSlot * ecxt_scantuple
EndForeignInsert_function EndForeignInsert
BeginForeignInsert_function BeginForeignInsert
ExecForeignInsert_function ExecForeignInsert
ExecForeignBatchInsert_function ExecForeignBatchInsert
GetForeignModifyBatchSize_function GetForeignModifyBatchSize
ResultRelInfo * resultRelInfo
ResultRelInfo * rootResultRelInfo
struct TransitionCaptureState * mt_transition_capture
SubTransactionId rd_firstRelfilelocatorSubid
SubTransactionId rd_newRelfilelocatorSubid
SubTransactionId rd_createSubid
TupleTableSlot * ri_PartitionTupleSlot
struct CopyMultiInsertBuffer * ri_CopyMultiInsertBuffer
TriggerDesc * ri_TrigDesc
struct FdwRoutine * ri_FdwRoutine
TupleTableSlot * tcs_original_insert_tuple
bool trig_insert_instead_row
bool trig_insert_after_row
bool trig_insert_new_table
bool trig_insert_before_row
bool has_generated_stored
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
#define TABLE_INSERT_FROZEN
#define TABLE_INSERT_SKIP_FSM
static void table_finish_bulk_insert(Relation rel, int options)
static void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate)
static void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, int options, struct BulkInsertStateData *bistate)
void ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
bool ExecIRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType)
void ExecASInsertTriggers(EState *estate, ResultRelInfo *relinfo, TransitionCaptureState *transition_capture)
void AfterTriggerEndQuery(EState *estate)
void AfterTriggerBeginQuery(void)
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
#define TupleDescAttr(tupdesc, i)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static void ExecMaterializeSlot(TupleTableSlot *slot)
char * wait_result_to_str(int exitstatus)
bool wait_result_is_signal(int exit_status, int signum)
SubTransactionId GetCurrentSubTransactionId(void)
CommandId GetCurrentCommandId(bool used)