PostgreSQL Source Code git master
plpy_cursorobject.c
Go to the documentation of this file.
1/*
2 * the PLyCursor class
3 *
4 * src/pl/plpython/plpy_cursorobject.c
5 */
6
7#include "postgres.h"
8
9#include <limits.h>
10
11#include "catalog/pg_type.h"
12#include "mb/pg_wchar.h"
13#include "plpy_cursorobject.h"
14#include "plpy_elog.h"
15#include "plpy_main.h"
16#include "plpy_planobject.h"
17#include "plpy_resultobject.h"
18#include "plpy_spi.h"
19#include "plpython.h"
20#include "utils/memutils.h"
21
22static PyObject *PLy_cursor_query(const char *query);
23static void PLy_cursor_dealloc(PLyCursorObject *self);
24static PyObject *PLy_cursor_iternext(PyObject *self);
25static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
27
28static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
29
30static PyMethodDef PLy_cursor_methods[] = {
31 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32 {"close", PLy_cursor_close, METH_NOARGS, NULL},
33 {NULL, NULL, 0, NULL}
34};
35
36static PyType_Slot PLyCursor_slots[] =
37{
38 {
39 Py_tp_dealloc, PLy_cursor_dealloc
40 },
41 {
42 Py_tp_doc, (char *) PLy_cursor_doc
43 },
44 {
45 Py_tp_iter, PyObject_SelfIter
46 },
47 {
48 Py_tp_iternext, PLy_cursor_iternext
49 },
50 {
51 Py_tp_methods, PLy_cursor_methods
52 },
53 {
54 0, NULL
55 }
56};
57
58static PyType_Spec PLyCursor_spec =
59{
60 .name = "PLyCursor",
61 .basicsize = sizeof(PLyCursorObject),
62 .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
63 .slots = PLyCursor_slots,
64};
65
66static PyTypeObject *PLy_CursorType;
67
68void
70{
71 PLy_CursorType = (PyTypeObject *) PyType_FromSpec(&PLyCursor_spec);
72 if (!PLy_CursorType)
73 elog(ERROR, "could not initialize PLy_CursorType");
74}
75
76PyObject *
77PLy_cursor(PyObject *self, PyObject *args)
78{
79 char *query;
80 PyObject *plan;
81 PyObject *planargs = NULL;
82
83 if (PyArg_ParseTuple(args, "s", &query))
84 return PLy_cursor_query(query);
85
86 PyErr_Clear();
87
88 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
89 return PLy_cursor_plan(plan, planargs);
90
91 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
92 return NULL;
93}
94
95
96static PyObject *
97PLy_cursor_query(const char *query)
98{
101 volatile MemoryContext oldcontext;
102 volatile ResourceOwner oldowner;
103
104 if ((cursor = PyObject_New(PLyCursorObject, PLy_CursorType)) == NULL)
105 return NULL;
106#if PY_VERSION_HEX < 0x03080000
107 /* Workaround for Python issue 35810; no longer necessary in Python 3.8 */
108 Py_INCREF(PLy_CursorType);
109#endif
110 cursor->portalname = NULL;
111 cursor->closed = false;
113 "PL/Python cursor context",
115
116 /* Initialize for converting result tuples to Python */
117 PLy_input_setup_func(&cursor->result, cursor->mcxt,
118 RECORDOID, -1,
119 exec_ctx->curr_proc);
120
121 oldcontext = CurrentMemoryContext;
122 oldowner = CurrentResourceOwner;
123
124 PLy_spi_subtransaction_begin(oldcontext, oldowner);
125
126 PG_TRY();
127 {
129 Portal portal;
130
131 pg_verifymbstr(query, strlen(query), false);
132
133 plan = SPI_prepare(query, 0, NULL);
134 if (plan == NULL)
135 elog(ERROR, "SPI_prepare failed: %s",
137
138 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
139 exec_ctx->curr_proc->fn_readonly);
141
142 if (portal == NULL)
143 elog(ERROR, "SPI_cursor_open() failed: %s",
145
146 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
147
148 PinPortal(portal);
149
150 PLy_spi_subtransaction_commit(oldcontext, oldowner);
151 }
152 PG_CATCH();
153 {
154 PLy_spi_subtransaction_abort(oldcontext, oldowner);
155 return NULL;
156 }
157 PG_END_TRY();
158
159 Assert(cursor->portalname != NULL);
160 return (PyObject *) cursor;
161}
162
163PyObject *
164PLy_cursor_plan(PyObject *ob, PyObject *args)
165{
167 volatile int nargs;
170 volatile MemoryContext oldcontext;
171 volatile ResourceOwner oldowner;
172
173 if (args)
174 {
175 if (!PySequence_Check(args) || PyUnicode_Check(args))
176 {
177 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
178 return NULL;
179 }
180 nargs = PySequence_Length(args);
181 }
182 else
183 nargs = 0;
184
185 plan = (PLyPlanObject *) ob;
186
187 if (nargs != plan->nargs)
188 {
189 char *sv;
190 PyObject *so = PyObject_Str(args);
191
192 if (!so)
193 PLy_elog(ERROR, "could not execute plan");
194 sv = PLyUnicode_AsString(so);
195 PLy_exception_set_plural(PyExc_TypeError,
196 "Expected sequence of %d argument, got %d: %s",
197 "Expected sequence of %d arguments, got %d: %s",
198 plan->nargs,
199 plan->nargs, nargs, sv);
200 Py_DECREF(so);
201
202 return NULL;
203 }
204
205 if ((cursor = PyObject_New(PLyCursorObject, PLy_CursorType)) == NULL)
206 return NULL;
207#if PY_VERSION_HEX < 0x03080000
208 /* Workaround for Python issue 35810; no longer necessary in Python 3.8 */
209 Py_INCREF(PLy_CursorType);
210#endif
211 cursor->portalname = NULL;
212 cursor->closed = false;
214 "PL/Python cursor context",
216
217 /* Initialize for converting result tuples to Python */
218 PLy_input_setup_func(&cursor->result, cursor->mcxt,
219 RECORDOID, -1,
220 exec_ctx->curr_proc);
221
222 oldcontext = CurrentMemoryContext;
223 oldowner = CurrentResourceOwner;
224
225 PLy_spi_subtransaction_begin(oldcontext, oldowner);
226
227 PG_TRY();
228 {
229 Portal portal;
230 MemoryContext tmpcontext;
231 Datum *volatile values;
232 char *volatile nulls;
233 volatile int j;
234
235 /*
236 * Converted arguments and associated cruft will be in this context,
237 * which is local to our subtransaction.
238 */
240 "PL/Python temporary context",
242 MemoryContextSwitchTo(tmpcontext);
243
244 if (nargs > 0)
245 {
246 values = (Datum *) palloc(nargs * sizeof(Datum));
247 nulls = (char *) palloc(nargs * sizeof(char));
248 }
249 else
250 {
251 values = NULL;
252 nulls = NULL;
253 }
254
255 for (j = 0; j < nargs; j++)
256 {
257 PLyObToDatum *arg = &plan->args[j];
258 PyObject *elem;
259
260 elem = PySequence_GetItem(args, j);
261 PG_TRY(2);
262 {
263 bool isnull;
264
265 values[j] = PLy_output_convert(arg, elem, &isnull);
266 nulls[j] = isnull ? 'n' : ' ';
267 }
268 PG_FINALLY(2);
269 {
270 Py_DECREF(elem);
271 }
272 PG_END_TRY(2);
273 }
274
275 MemoryContextSwitchTo(oldcontext);
276
277 portal = SPI_cursor_open(NULL, plan->plan, values, nulls,
278 exec_ctx->curr_proc->fn_readonly);
279 if (portal == NULL)
280 elog(ERROR, "SPI_cursor_open() failed: %s",
282
283 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
284
285 PinPortal(portal);
286
287 MemoryContextDelete(tmpcontext);
288 PLy_spi_subtransaction_commit(oldcontext, oldowner);
289 }
290 PG_CATCH();
291 {
292 Py_DECREF(cursor);
293 /* Subtransaction abort will remove the tmpcontext */
294 PLy_spi_subtransaction_abort(oldcontext, oldowner);
295 return NULL;
296 }
297 PG_END_TRY();
298
299 Assert(cursor->portalname != NULL);
300 return (PyObject *) cursor;
301}
302
303static void
305{
306#if PY_VERSION_HEX >= 0x03080000
307 PyTypeObject *tp = Py_TYPE(self);
308#endif
309 Portal portal;
310
311 if (!self->closed)
312 {
313 portal = GetPortalByName(self->portalname);
314
315 if (PortalIsValid(portal))
316 {
317 UnpinPortal(portal);
318 SPI_cursor_close(portal);
319 }
320 self->closed = true;
321 }
322 if (self->mcxt)
323 {
325 self->mcxt = NULL;
326 }
327
328 PyObject_Free(self);
329#if PY_VERSION_HEX >= 0x03080000
330 /* This was not needed before Python 3.8 (Python issue 35810) */
331 Py_DECREF(tp);
332#endif
333}
334
335static PyObject *
336PLy_cursor_iternext(PyObject *self)
337{
339 PyObject *ret;
341 volatile MemoryContext oldcontext;
342 volatile ResourceOwner oldowner;
343 Portal portal;
344
345 cursor = (PLyCursorObject *) self;
346
347 if (cursor->closed)
348 {
349 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
350 return NULL;
351 }
352
353 portal = GetPortalByName(cursor->portalname);
354 if (!PortalIsValid(portal))
355 {
356 PLy_exception_set(PyExc_ValueError,
357 "iterating a cursor in an aborted subtransaction");
358 return NULL;
359 }
360
361 oldcontext = CurrentMemoryContext;
362 oldowner = CurrentResourceOwner;
363
364 PLy_spi_subtransaction_begin(oldcontext, oldowner);
365
366 PG_TRY();
367 {
368 SPI_cursor_fetch(portal, true, 1);
369 if (SPI_processed == 0)
370 {
371 PyErr_SetNone(PyExc_StopIteration);
372 ret = NULL;
373 }
374 else
375 {
377 exec_ctx->curr_proc);
378
379 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
380 SPI_tuptable->tupdesc, true);
381 }
382
384
385 PLy_spi_subtransaction_commit(oldcontext, oldowner);
386 }
387 PG_CATCH();
388 {
389 PLy_spi_subtransaction_abort(oldcontext, oldowner);
390 return NULL;
391 }
392 PG_END_TRY();
393
394 return ret;
395}
396
397static PyObject *
398PLy_cursor_fetch(PyObject *self, PyObject *args)
399{
401 int count;
402 PLyResultObject *ret;
404 volatile MemoryContext oldcontext;
405 volatile ResourceOwner oldowner;
406 Portal portal;
407
408 if (!PyArg_ParseTuple(args, "i:fetch", &count))
409 return NULL;
410
411 cursor = (PLyCursorObject *) self;
412
413 if (cursor->closed)
414 {
415 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
416 return NULL;
417 }
418
419 portal = GetPortalByName(cursor->portalname);
420 if (!PortalIsValid(portal))
421 {
422 PLy_exception_set(PyExc_ValueError,
423 "iterating a cursor in an aborted subtransaction");
424 return NULL;
425 }
426
428 if (ret == NULL)
429 return NULL;
430
431 oldcontext = CurrentMemoryContext;
432 oldowner = CurrentResourceOwner;
433
434 PLy_spi_subtransaction_begin(oldcontext, oldowner);
435
436 PG_TRY();
437 {
438 SPI_cursor_fetch(portal, true, count);
439
440 Py_DECREF(ret->status);
441 ret->status = PyLong_FromLong(SPI_OK_FETCH);
442
443 Py_DECREF(ret->nrows);
444 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
445
446 if (SPI_processed != 0)
447 {
448 uint64 i;
449
450 /*
451 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
452 * and list indices; so we cannot support a result larger than
453 * PY_SSIZE_T_MAX.
454 */
455 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
457 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
458 errmsg("query result has too many rows to fit in a Python list")));
459
460 Py_DECREF(ret->rows);
461 ret->rows = PyList_New(SPI_processed);
462 if (!ret->rows)
463 {
464 Py_DECREF(ret);
465 ret = NULL;
466 }
467 else
468 {
470 exec_ctx->curr_proc);
471
472 for (i = 0; i < SPI_processed; i++)
473 {
474 PyObject *row = PLy_input_from_tuple(&cursor->result,
477 true);
478
479 PyList_SetItem(ret->rows, i, row);
480 }
481 }
482 }
483
485
486 PLy_spi_subtransaction_commit(oldcontext, oldowner);
487 }
488 PG_CATCH();
489 {
490 PLy_spi_subtransaction_abort(oldcontext, oldowner);
491 return NULL;
492 }
493 PG_END_TRY();
494
495 return (PyObject *) ret;
496}
497
498static PyObject *
499PLy_cursor_close(PyObject *self, PyObject *unused)
500{
502
503 if (!cursor->closed)
504 {
505 Portal portal = GetPortalByName(cursor->portalname);
506
507 if (!PortalIsValid(portal))
508 {
509 PLy_exception_set(PyExc_ValueError,
510 "closing a cursor in an aborted subtransaction");
511 return NULL;
512 }
513
514 UnpinPortal(portal);
515 SPI_cursor_close(portal);
516 cursor->closed = true;
517 }
518
519 Py_RETURN_NONE;
520}
static Datum values[MAXATTR]
Definition: bootstrap.c:151
uint64_t uint64
Definition: c.h:503
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define PG_FINALLY(...)
Definition: elog.h:388
#define ereport(elevel,...)
Definition: elog.h:149
Assert(PointerIsAligned(start, uint64))
int j
Definition: isn.c:75
int i
Definition: isn.c:74
#define PLy_elog
bool pg_verifymbstr(const char *mbstr, int len, bool noError)
Definition: mbutils.c:1556
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1686
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CurTransactionContext
Definition: mcxt.c:155
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void * arg
#define plan(x)
Definition: pg_regress.c:161
static PyType_Spec PLyCursor_spec
PyObject * PLy_cursor(PyObject *self, PyObject *args)
PyObject * PLy_cursor_plan(PyObject *ob, PyObject *args)
static PyMethodDef PLy_cursor_methods[]
static PyObject * PLy_cursor_query(const char *query)
static PyObject * PLy_cursor_fetch(PyObject *self, PyObject *args)
static PyObject * PLy_cursor_iternext(PyObject *self)
static PyObject * PLy_cursor_close(PyObject *self, PyObject *unused)
static PyTypeObject * PLy_CursorType
static PyType_Slot PLyCursor_slots[]
static void PLy_cursor_dealloc(PLyCursorObject *self)
void PLy_cursor_init_type(void)
static const char PLy_cursor_doc[]
struct PLyCursorObject PLyCursorObject
PyObject * PLy_exc_error
Definition: plpy_elog.c:15
void PLy_exception_set(PyObject *exc, const char *fmt,...)
Definition: plpy_elog.c:472
void PLy_exception_set_plural(PyObject *exc, const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: plpy_elog.c:486
PLyExecutionContext * PLy_current_execution_context(void)
Definition: plpy_main.c:365
PyObject * PLy_result_new(void)
void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:573
void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:582
void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:565
void PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt, Oid typeOid, int32 typmod, PLyProcedure *proc)
Definition: plpy_typeio.c:418
PyObject * PLy_input_from_tuple(PLyDatumToOb *arg, HeapTuple tuple, TupleDesc desc, bool include_generated)
Definition: plpy_typeio.c:134
void PLy_input_setup_tuple(PLyDatumToOb *arg, TupleDesc desc, PLyProcedure *proc)
Definition: plpy_typeio.c:165
Datum PLy_output_convert(PLyObToDatum *arg, PyObject *val, bool *isnull)
Definition: plpy_typeio.c:120
char * PLyUnicode_AsString(PyObject *unicode)
Definition: plpy_util.c:82
#define PortalIsValid(p)
Definition: portal.h:212
void PinPortal(Portal portal)
Definition: portalmem.c:373
void UnpinPortal(Portal portal)
Definition: portalmem.c:382
Portal GetPortalByName(const char *name)
Definition: portalmem.c:130
uintptr_t Datum
Definition: postgres.h:69
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
uint64 SPI_processed
Definition: spi.c:44
int SPI_freeplan(SPIPlanPtr plan)
Definition: spi.c:1026
const char * SPI_result_code_string(int code)
Definition: spi.c:1974
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_result
Definition: spi.c:46
void SPI_cursor_fetch(Portal portal, bool forward, long count)
Definition: spi.c:1808
void SPI_freetuptable(SPITupleTable *tuptable)
Definition: spi.c:1387
Portal SPI_cursor_open(const char *name, SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only)
Definition: spi.c:1446
SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes)
Definition: spi.c:861
void SPI_cursor_close(Portal portal)
Definition: spi.c:1864
#define SPI_OK_FETCH
Definition: spi.h:84
MemoryContext mcxt
PyObject_HEAD char * portalname
PLyProcedure * curr_proc
Definition: plpy_main.h:20
PyObject_HEAD PyObject * nrows
const char * name
Definition: portal.h:118
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: type.h:138